Make the bid / ask levels configurable and resolve issues with previous streams not being cancelled

This commit is contained in:
moshferatu 2024-04-23 03:39:09 -07:00
parent 479bbfd791
commit d885175945

View File

@ -7,6 +7,7 @@ using System.ComponentModel.DataAnnotations;
using System.Linq; using System.Linq;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Windows; using System.Windows;
using System.Windows.Input; using System.Windows.Input;
@ -27,12 +28,12 @@ using NinjaTrader.Core;
//This namespace holds Indicators in this folder and is required. Do not change it. //This namespace holds Indicators in this folder and is required. Do not change it.
namespace NinjaTrader.NinjaScript.Indicators namespace NinjaTrader.NinjaScript.Indicators
{ {
class MarketDepthProcessor class MarketDepthStream
{ {
public static ConcurrentDictionary<float, int> Asks = new ConcurrentDictionary<float, int>(); public ConcurrentDictionary<float, int> Asks { get; private set; }
public static ConcurrentDictionary<float, int> Bids = new ConcurrentDictionary<float, int>(); public ConcurrentDictionary<float, int> Bids { get; private set; }
static Dictionary<string, string> MonthCodes = new Dictionary<string, string> static readonly Dictionary<string, string> MonthCodes = new Dictionary<string, string>
{ {
{"01", "F"}, {"01", "F"},
{"02", "G"}, {"02", "G"},
@ -48,11 +49,11 @@ namespace NinjaTrader.NinjaScript.Indicators
{"12", "Z"} {"12", "Z"}
}; };
public static async Task StreamMarketDepth(Instrument instrument, int maxLevels, string host, int port) public async Task StreamMarketDepth(Instrument instrument, int maxLevels, string host, int port, CancellationToken cancellationToken)
{ {
Asks.Clear(); Asks = new ConcurrentDictionary<float, int>();
Bids.Clear(); Bids = new ConcurrentDictionary<float, int>();
string symbol = GetIQFeedSymbol(instrument); string symbol = GetIQFeedSymbol(instrument);
using (TcpClient client = new TcpClient(host, port)) using (TcpClient client = new TcpClient(host, port))
@ -60,7 +61,7 @@ namespace NinjaTrader.NinjaScript.Indicators
{ {
StringBuilder data = new StringBuilder(); StringBuilder data = new StringBuilder();
while (true) while (!cancellationToken.IsCancellationRequested)
{ {
byte[] buffer = new byte[4096]; byte[] buffer = new byte[4096];
int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length); int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
@ -103,7 +104,7 @@ namespace NinjaTrader.NinjaScript.Indicators
} }
} }
static string GetIQFeedSymbol(Instrument instrument) private string GetIQFeedSymbol(Instrument instrument)
{ {
// Example: ES 06-24 // Example: ES 06-24
string masterInstrument = instrument.MasterInstrument.Name; string masterInstrument = instrument.MasterInstrument.Name;
@ -117,13 +118,13 @@ namespace NinjaTrader.NinjaScript.Indicators
return "@" + masterInstrument + monthCode + year; return "@" + masterInstrument + monthCode + year;
} }
static async Task SendCommand(NetworkStream stream, string command) private async Task SendCommand(NetworkStream stream, string command)
{ {
byte[] commandBytes = Encoding.UTF8.GetBytes(command + "\r\n"); byte[] commandBytes = Encoding.UTF8.GetBytes(command + "\r\n");
await stream.WriteAsync(commandBytes, 0, commandBytes.Length); await stream.WriteAsync(commandBytes, 0, commandBytes.Length);
} }
static void ProcessMarketDepthMessage(string message) private void ProcessMarketDepthMessage(string message)
{ {
string[] fields = message.Split(','); string[] fields = message.Split(',');
@ -160,7 +161,7 @@ namespace NinjaTrader.NinjaScript.Indicators
} }
} }
static void TrimMarketDepth(int maxLevels) private void TrimMarketDepth(int maxLevels)
{ {
if (maxLevels <= 0) if (maxLevels <= 0)
return; return;
@ -199,6 +200,9 @@ namespace NinjaTrader.NinjaScript.Indicators
public class MarketDepth : Indicator public class MarketDepth : Indicator
{ {
private MarketDepthStream Stream = new MarketDepthStream();
private CancellationTokenSource StreamCancellation;
protected override void OnStateChange() protected override void OnStateChange()
{ {
if (State == State.SetDefaults) if (State == State.SetDefaults)
@ -216,34 +220,45 @@ namespace NinjaTrader.NinjaScript.Indicators
IsSuspendedWhileInactive = false; IsSuspendedWhileInactive = false;
MaxLevels = 0; MaxLevels = 0;
MaxWidth = 500; MaxWidth = 500;
BidStroke = new Stroke(Brushes.LimeGreen, DashStyleHelper.Solid, 3);
AskStroke = new Stroke(Brushes.Red, DashStyleHelper.Solid, 3);
Host = "127.0.0.1"; Host = "127.0.0.1";
Port = 9200; Port = 9200;
} }
else if (State == State.Configure)
{
}
else if (State == State.DataLoaded) else if (State == State.DataLoaded)
{ {
Task.Run(() => MarketDepthProcessor.StreamMarketDepth(Instrument, MaxLevels, Host, Port)); StreamCancellation = new CancellationTokenSource();
Task.Run(() => Stream.StreamMarketDepth(Instrument, MaxLevels, Host, Port, StreamCancellation.Token));
} }
else if (State == State.Historical) else if (State == State.Historical)
{ {
SetZOrder(-1); // Display behind bars on chart. SetZOrder(-1); // Display behind bars on chart.
} }
else if (State == State.Terminated)
{
if (StreamCancellation != null)
{
StreamCancellation.Cancel();
StreamCancellation.Dispose();
}
}
} }
protected override void OnRender(ChartControl chartControl, ChartScale chartScale) protected override void OnRender(ChartControl chartControl, ChartScale chartScale)
{ {
base.OnRender(chartControl, chartScale); base.OnRender(chartControl, chartScale);
if (MarketDepthProcessor.Asks.Count() <= 0 || MarketDepthProcessor.Bids.Count() <= 0) if (Stream.Asks == null || Stream.Bids == null)
return;
if (Stream.Asks.Count() <= 0 || Stream.Bids.Count() <= 0)
return; return;
// Find the max volume for visually scaling the indicator. // Find the max volume for visually scaling the indicator.
long maxVolume = Math.Max(MarketDepthProcessor.Asks.Values.ToList().DefaultIfEmpty(0).Max(), long maxVolume = Math.Max(Stream.Asks.Values.ToList().DefaultIfEmpty(0).Max(),
MarketDepthProcessor.Bids.Values.ToList().DefaultIfEmpty(0).Max()); Stream.Bids.Values.ToList().DefaultIfEmpty(0).Max());
foreach (var Ask in MarketDepthProcessor.Asks.ToArray()) foreach (var Ask in Stream.Asks.ToArray())
{ {
float price = Ask.Key; float price = Ask.Key;
int volume = Ask.Value; int volume = Ask.Value;
@ -251,10 +266,10 @@ namespace NinjaTrader.NinjaScript.Indicators
int length = (int)((double)volume / maxVolume * MaxWidth); int length = (int)((double)volume / maxVolume * MaxWidth);
int y = chartScale.GetYByValue(price); int y = chartScale.GetYByValue(price);
DrawHorizontalLine((ChartPanel.X + ChartPanel.W) - length, ChartPanel.X + ChartPanel.W, y, new Stroke(Brushes.Red, DashStyleHelper.Solid, 3)); DrawHorizontalLine((ChartPanel.X + ChartPanel.W) - length, ChartPanel.X + ChartPanel.W, y, AskStroke);
} }
foreach (var Bid in MarketDepthProcessor.Bids.ToArray()) foreach (var Bid in Stream.Bids.ToArray())
{ {
float price = Bid.Key; float price = Bid.Key;
int volume = Bid.Value; int volume = Bid.Value;
@ -262,7 +277,7 @@ namespace NinjaTrader.NinjaScript.Indicators
int length = (int)((double)volume / maxVolume * MaxWidth); int length = (int)((double)volume / maxVolume * MaxWidth);
int y = chartScale.GetYByValue(price); int y = chartScale.GetYByValue(price);
DrawHorizontalLine((ChartPanel.X + ChartPanel.W) - length, ChartPanel.X + ChartPanel.W, y, new Stroke(Brushes.LimeGreen, DashStyleHelper.Solid, 3)); DrawHorizontalLine((ChartPanel.X + ChartPanel.W) - length, ChartPanel.X + ChartPanel.W, y, BidStroke);
} }
} }
@ -294,11 +309,19 @@ namespace NinjaTrader.NinjaScript.Indicators
{ get; set; } { get; set; }
[NinjaScriptProperty] [NinjaScriptProperty]
[Display(Name = "IQFeed Host", Description = "The host on which the IQFeed connection is running", Order = 3, GroupName = "Market Depth")] [Display(Name = "Bid Level", Description = "Color and style of bid levels", Order = 3, GroupName = "Market Depth")]
public Stroke BidStroke { get; set; }
[NinjaScriptProperty]
[Display(Name = "Ask Level", Description = "Color and style of ask levels", Order = 4, GroupName = "Market Depth")]
public Stroke AskStroke { get; set; }
[NinjaScriptProperty]
[Display(Name = "IQFeed Host", Description = "The host on which the IQFeed connection is running", Order = 5, GroupName = "Market Depth")]
public string Host { get; set; } public string Host { get; set; }
[NinjaScriptProperty] [NinjaScriptProperty]
[Display(Name = "IQFeed Port", Description = "The port on which the IQFeed connection is listening", Order = 4, GroupName = "Market Depth")] [Display(Name = "IQFeed Port", Description = "The port on which the IQFeed connection is listening", Order = 6, GroupName = "Market Depth")]
public int Port { get; set; } public int Port { get; set; }
} }
} }
@ -310,18 +333,18 @@ namespace NinjaTrader.NinjaScript.Indicators
public partial class Indicator : NinjaTrader.Gui.NinjaScript.IndicatorRenderBase public partial class Indicator : NinjaTrader.Gui.NinjaScript.IndicatorRenderBase
{ {
private MarketDepth[] cacheMarketDepth; private MarketDepth[] cacheMarketDepth;
public MarketDepth MarketDepth(int maxLevels, int maxWidth, string host, int port) public MarketDepth MarketDepth(int maxLevels, int maxWidth, Stroke bidStroke, Stroke askStroke, string host, int port)
{ {
return MarketDepth(Input, maxLevels, maxWidth, host, port); return MarketDepth(Input, maxLevels, maxWidth, bidStroke, askStroke, host, port);
} }
public MarketDepth MarketDepth(ISeries<double> input, int maxLevels, int maxWidth, string host, int port) public MarketDepth MarketDepth(ISeries<double> input, int maxLevels, int maxWidth, Stroke bidStroke, Stroke askStroke, string host, int port)
{ {
if (cacheMarketDepth != null) if (cacheMarketDepth != null)
for (int idx = 0; idx < cacheMarketDepth.Length; idx++) for (int idx = 0; idx < cacheMarketDepth.Length; idx++)
if (cacheMarketDepth[idx] != null && cacheMarketDepth[idx].MaxLevels == maxLevels && cacheMarketDepth[idx].MaxWidth == maxWidth && cacheMarketDepth[idx].Host == host && cacheMarketDepth[idx].Port == port && cacheMarketDepth[idx].EqualsInput(input)) if (cacheMarketDepth[idx] != null && cacheMarketDepth[idx].MaxLevels == maxLevels && cacheMarketDepth[idx].MaxWidth == maxWidth && cacheMarketDepth[idx].BidStroke == bidStroke && cacheMarketDepth[idx].AskStroke == askStroke && cacheMarketDepth[idx].Host == host && cacheMarketDepth[idx].Port == port && cacheMarketDepth[idx].EqualsInput(input))
return cacheMarketDepth[idx]; return cacheMarketDepth[idx];
return CacheIndicator<MarketDepth>(new MarketDepth(){ MaxLevels = maxLevels, MaxWidth = maxWidth, Host = host, Port = port }, input, ref cacheMarketDepth); return CacheIndicator<MarketDepth>(new MarketDepth(){ MaxLevels = maxLevels, MaxWidth = maxWidth, BidStroke = bidStroke, AskStroke = askStroke, Host = host, Port = port }, input, ref cacheMarketDepth);
} }
} }
} }
@ -330,14 +353,14 @@ namespace NinjaTrader.NinjaScript.MarketAnalyzerColumns
{ {
public partial class MarketAnalyzerColumn : MarketAnalyzerColumnBase public partial class MarketAnalyzerColumn : MarketAnalyzerColumnBase
{ {
public Indicators.MarketDepth MarketDepth(int maxLevels, int maxWidth, string host, int port) public Indicators.MarketDepth MarketDepth(int maxLevels, int maxWidth, Stroke bidStroke, Stroke askStroke, string host, int port)
{ {
return indicator.MarketDepth(Input, maxLevels, maxWidth, host, port); return indicator.MarketDepth(Input, maxLevels, maxWidth, bidStroke, askStroke, host, port);
} }
public Indicators.MarketDepth MarketDepth(ISeries<double> input , int maxLevels, int maxWidth, string host, int port) public Indicators.MarketDepth MarketDepth(ISeries<double> input , int maxLevels, int maxWidth, Stroke bidStroke, Stroke askStroke, string host, int port)
{ {
return indicator.MarketDepth(input, maxLevels, maxWidth, host, port); return indicator.MarketDepth(input, maxLevels, maxWidth, bidStroke, askStroke, host, port);
} }
} }
} }
@ -346,14 +369,14 @@ namespace NinjaTrader.NinjaScript.Strategies
{ {
public partial class Strategy : NinjaTrader.Gui.NinjaScript.StrategyRenderBase public partial class Strategy : NinjaTrader.Gui.NinjaScript.StrategyRenderBase
{ {
public Indicators.MarketDepth MarketDepth(int maxLevels, int maxWidth, string host, int port) public Indicators.MarketDepth MarketDepth(int maxLevels, int maxWidth, Stroke bidStroke, Stroke askStroke, string host, int port)
{ {
return indicator.MarketDepth(Input, maxLevels, maxWidth, host, port); return indicator.MarketDepth(Input, maxLevels, maxWidth, bidStroke, askStroke, host, port);
} }
public Indicators.MarketDepth MarketDepth(ISeries<double> input , int maxLevels, int maxWidth, string host, int port) public Indicators.MarketDepth MarketDepth(ISeries<double> input , int maxLevels, int maxWidth, Stroke bidStroke, Stroke askStroke, string host, int port)
{ {
return indicator.MarketDepth(input, maxLevels, maxWidth, host, port); return indicator.MarketDepth(input, maxLevels, maxWidth, bidStroke, askStroke, host, port);
} }
} }
} }