Timer to track websocket sharp messages in c#
Asked Answered
K

2

8

I am using a websocket sharp dll in my windows application to get messages from a GDAX server. Everything is working fine so far - the messages are coming and i am processing them. The point where I am stuck is when the messages stops coming. At least I don't find anything in the WebSocket.OnMessage Event(https://github.com/sta/websocket-sharp) that can help me in tracking when the messages are stopped (I tried emitonping also)

Now the messages I received have a message type 'Heartbeat' which is sent every second. I want to add a separate timer control to check if the heartbeat messages are coming every second or not and if it stops coming then I will need to reconnect the server again. But since nothing happens when the messages stops coming how do i track it, where should I put the timer code to check when heartbeat messages stops coming?

I hope I could explain the situation wherein I am struck. If someone is eager to help me and needs more inputs please let me know.

Update

    private void _3_Load(object sender, EventArgs e)
    {          
        ConnectAndGetWebsocketFeedMessages();           
    }

    public delegate void WSOpen(string text);
    public delegate void WSMessage(string message);
    public delegate void WSError(string text);
    public delegate void WSClose(string text);

    private static string _endPoint = "wss://ws-feed.gdax.com";
    WebSocket ws = new WebSocket(_endPoint);

    private bool IsConnected { get; set; }
    private string ProductId { get; set; }

    string productId = "LTC-EUR";
    ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();

    public void SetWebSocketSharpEvents()
    {
        ws.Log.Level = LogLevel.Trace;

        ws.OnOpen += (sender, e) =>
        {
            IsConnected = true;
            OnWSOpen("Connection Status :: Connected *********");
        };
        ws.EmitOnPing = true;
        ws.OnMessage += (sender, e) =>
        {
            if (e.IsPing)
            {
                OnWSMessage("ping received");
            }
            else
            {                    
                OnWSMessage(e.Data);
            }
        };

        ws.OnError += (sender, e) =>
        {
            IsConnected = false;
            OnWSError(e.Message); //An exception has occurred during an OnMessage event. An error has occurred in closing the connection.
            if (ws.IsAlive)
                ws.Close();
        };

        ws.OnClose += (sender, e) =>
        {
            IsConnected = false;
            OnWSClose("Close");
        };

        ws.ConnectAsync();
    }

    private void ConnectAndGetWebsocketFeedMessages()
    {            
        SetWebSocketSharpEvents();
    }

    private void SubscribeProduct(string sProductID)
    {
        if (IsConnected)
        {
            ProductId = sProductID;
            string data = "{\"type\": \"subscribe\", \"product_ids\": [\"" + sProductID + "\"]}";
            ws.Send(data);
            ws.Send("{\"type\": \"heartbeat\", \"on\": true}");
        }
    }

    void OnWSOpen(string text)
    {
        SubscribeProduct(productId);
        timer1.Interval = 1000;
        timer1.Tick += timer1_Tick;
        timer1.Start();
    }

    DateTime lastHeartbeatTime = DateTime.MinValue;
    bool isTimerStart = false;
    void OnWSMessage(string message)
    {
        concurrentQueue.Enqueue(message);
        SaveHeartbeatMessageTime(message);
        ProcessMessage(message);
    }

    private void SaveHeartbeatMessageTime(string jsonString)
    {
        var jToken = JToken.Parse(jsonString);

        var typeToken = jToken["type"];

        var type = typeToken.ToString();

        if (type == "heartbeat")
        {
            lastHeartbeatTime = DateTime.Now;
            this.Invoke(new MethodInvoker(delegate()
            {
                lbllastheartbeat.Text = lastHeartbeatTime.ToLongTimeString();
            }));               
        }
    }

    private void ProcessMessage(string message) {  }

    void OnWSError(string text) { }

    void OnWSClose(string text) { }

    bool isMessagesReceived = false;

    private void timer1_Tick(object sender, EventArgs e) // it stops working as soon as lbllastheartbeat gets some value
    {
        DateTime currentTime = DateTime.Now;
        TimeSpan duration = currentTime.Subtract(lastHeartbeatTime);
        this.Invoke(new MethodInvoker(delegate()
        {
            lblNow.Text = currentTime.ToLongTimeString();
        }));
        if (Int16.Parse(duration.ToString("ss")) > 1)
        {
            // reconnect here
        }
    }

Edit I am using windows form timer control and it keeps on calling timer1_Tick method and does not call OnWSMessage method. How do I ensure that both run parallel and if any message is missed or the message stops coming then it reconnects?

Edit2 The solutions provided below suggests to add the timer functionality in onMessage event but what will happen if I do not receive messages? If the messages are not received then the code does not do anything. I have taken a global variable and whenever a message comes it adds the time in that variable. Now I want to run a separate timer control which will check whether there is anything in that variable and if its value i.e difference of seconds is more than 1 then do something else keep on checking.

Is there anyone who can look into this and advise please.

Update2: I still want to do this with windows.timer control and not threading.timer. I have taken two labels in my windows app, lbllastheartbeat (to show the time when heartbeat message is received) and lblNow (to show the current time when timer is called).

Requirement - My timer will check if any heartbeat message is missed and that is done through the 'lastHeartbeatTime' variable which stores the time when the heartbeat message is received.

I would appreciate if anyone can review my code and suggest what or where I am doing wrong.

Kamenskuralski answered 7/11, 2017 at 4:57 Comment(6)
If I understand you correctly, you have an OnMessage event that gets raised when a message is received. In that event you could stop your timer, process the message, and then restart your timer. If you set your timer's Interval to one second (or whatever the max time between messages is that indicates no messages have been sent), then the timer's Elapsed (or Tick) event would be where you would know that no message has been received within one second.Gossett
@Rufus L- can you please look at my code and advise why my timer1_tick event is called infinitely and does not reach the onmessage eventKamenskuralski
My solution below also resets timer in OnOpen. So it will detect if there are no messages for 2 seconds even if there were no messages at all yet.Munition
Open works only for the first time.. so yes it will work but I am talking about the situation where the messages are coming and suddenly they stops.. In that situation onmessage event is not fired so it will not call ' ResetTimeoutTimer()'.Kamenskuralski
I would appreciate if you could please help/advise me with my timer1_tick event.Kamenskuralski
ResetTimeoutTimer, as its name indicates, resets timer so it starts ticking and will fire its callback after X seconds (2 seconds in my example). When messages suddenly stop coming - nothing calls ResetTimeoutTimer and it's good, because that's the whole point. When nothing resets timer - it finally fires its callback. When callback is fired you know that you did not receive messages for the last 2 seconds and so - should reconnect.Munition
M
6

The answer has already been given - you need to start timer which will fire after your timeout period when you receive message, and reset that timer every time you receive message. But it seems you want code example, so here it is (with comments):

System.Threading.Timer _timeoutTimer;
private readonly object _timeoutTimerLock = new object();
private void ResetTimeoutTimer() {
    // if you are sure you will never access this from multiple threads at the same time - remove lock
    lock (_timeoutTimerLock) {
        // initialize or reset the timer to fire once, after 2 seconds
        if (_timeoutTimer == null)
            _timeoutTimer = new System.Threading.Timer(ReconnectAfterTimeout, null, TimeSpan.FromSeconds(2), Timeout.InfiniteTimeSpan);
        else
            _timeoutTimer.Change(TimeSpan.FromSeconds(2), Timeout.InfiniteTimeSpan);
    }
}

private void StopTimeoutTimer() {
    // if you are sure you will never access this from multiple threads at the same time - remove lock
    lock (_timeoutTimerLock) {
        if (_timeoutTimer != null)
            _timeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
    }
}

private void ReconnectAfterTimeout(object state) {
    // reconnect here
}

public void SetWebSocketSharpEvents() {
    ws.Log.Level = LogLevel.Trace;

    ws.OnOpen += (sender, e) => {
        // start timer here so that if you don't get first message after 2 seconds - reconnect
        ResetTimeoutTimer();
        IsConnected = true;
        OnWSOpen("Connection Status :: Connected *********");
    };
    ws.EmitOnPing = true;
    ws.OnMessage += (sender, e) => {
        // and here
        ResetTimeoutTimer();
        if (e.IsPing) {
            OnWSMessage("ping received");
        }
        else {
            OnWSMessage(e.Data);
        }
    };

    ws.OnError += (sender, e) => {
        // stop it here
        StopTimeoutTimer();
        IsConnected = false;
        OnWSError(e.Message); //An exception has occurred during an OnMessage event. An error has occurred in closing the connection.


  if (ws.IsAlive)
                ws.Close();
        };

        ws.OnClose += (sender, e) => {
            // and here
            StopTimeoutTimer();
            IsConnected = false;
            OnWSClose("Close");
        };

        ws.ConnectAsync();
    }
Munition answered 9/11, 2017 at 7:41 Comment(3)
Thanks Evk for the explanation. I am getting error - 'Timer' is an ambiguous reference between 'System.Threading.Timer' and 'System.Windows.Forms.Timer'. How do I resolve this?Kamenskuralski
@Kamenskuralski replace new Timer with new System.Threading.Timer.Munition
Evk - I am trying to run your code but not able to call the timercallback of timer. I have created a separate thread for this. Can you please check here - #47455537Kamenskuralski
C
3

from your question what i understand is , your message is sending after every seconds , but the problem is only when it stop you want to know and run it again, if it like that , you just apply timer and check for every seconds if the message not sent after a second or more (check sentMessage() Method set a boolean if message sent it should give true otherwise false) , than give the command to reconnect the server again .

Convolvulus answered 7/11, 2017 at 5:13 Comment(2)
Thanks Muhammad Ali for the reply. How do I apply the timer? I tried what you have suggested but it keeps on going in infinite loop of ticker event.Kamenskuralski
Timer is simple , timer.Start() so it start the time , So u will need to check that your method working properly , by checking after a second or whatever you want .Convolvulus

© 2022 - 2024 — McMap. All rights reserved.