Redis + ActionController::Live threads not dying
Asked Answered
M

7

35

Background: We've built a chat feature in to one of our existing Rails applications. We're using the new ActionController::Live module and running Puma (with Nginx in production), and subscribing to messages through Redis. We're using EventSource client side to establish the connection asynchronously.

Problem Summary: Threads are never dying when the connection is terminated.

For example, should the user navigate away, close the browser, or even go to a different page within the application, a new thread is spawned (as expected), but the old one continues to live.

The problem as I presently see it is that when any of these situations occur, the server has no way of knowing whether the connection on the browser's end is terminated, until something attempts to write to this broken stream, which would never happen once the browser has moved away from the original page.

This problem seems to be documented on github, and similar questions are asked on StackOverflow here (pretty well exact same question) and here (regarding getting number of active threads).

The only solution I've been able to come up with, based on these posts, is to implement a type of thread / connection poker. Attempting to write to a broken connection generates an IOError which I can catch and properly close the connection, allowing the thread to die. This is the controller code for that solution:

def events
  response.headers["Content-Type"] = "text/event-stream"

  stream_error = false; # used by flusher thread to determine when to stop

  redis = Redis.new

  # Subscribe to our events
  redis.subscribe("message.create", "message.user_list_update") do |on| 
    on.message do |event, data| # when message is received, write to stream
      response.stream.write("messageType: '#{event}', data: #{data}\n\n")
    end

    # This is the monitor / connection poker thread
    # Periodically poke the connection by attempting to write to the stream
    flusher_thread = Thread.new do
      while !stream_error
        $redis.publish "message.create", "flusher_test"
        sleep 2.seconds
      end
    end
  end 

  rescue IOError
    logger.info "Stream closed"
    stream_error = true;
  ensure
    logger.info "Events action is quitting redis and closing stream!"
    redis.quit
    response.stream.close
end

(Note: the events method seems to get blocked on the subscribe method invocation. Everything else (the streaming) works properly so I assume this is normal.)

(Other note: the flusher thread concept makes more sense as a single long-running background process, a bit like a garbage thread collector. The problem with my implementation above is that a new thread is spawned for each connection, which is pointless. Anyone attempting to implement this concept should do it more like a single process, not so much as I've outlined. I'll update this post when I successfully re-implement this as a single background process.)

The downside of this solution is that we've only delayed or lessened the problem, not completely solved it. We still have 2 threads per user, in addition to other requests such as ajax, which seems terrible from a scaling perspective; it seems completely unattainable and impractical for a larger system with many possible concurrent connections.

I feel like I am missing something vital; I find it somewhat difficult to believe that Rails has a feature that is so obviously broken without implementing a custom connection-checker like I have done.

Question: How do we allow the connections / threads to die without implementing something corny such as a 'connection poker', or garbage thread collector?

As always let me know if I've left anything out.

Update Just to add a bit of extra info: Huetsch over at github posted this comment pointing out that SSE is based on TCP, which normally sends a FIN packet when the connection is closed, letting the other end (server in this case) know that its safe to close the connection. Huetsch points out that either the browser is not sending that packet (perhaps a bug in the EventSource library?), or Rails is not catching it or doing anything with it (definitely a bug in Rails, if that's the case). The search continues...

Another Update Using Wireshark, I can indeed see FIN packets being sent. Admittedly, I am not very knowledgeable or experienced with protocol level stuff, however from what I can tell, I definitely detect a FIN packet being sent from the browser when I establish the SSE connection using EventSource from the browser, and NO packet sent if I remove that connection (meaning no SSE). Though I'm not terribly up on my TCP knowledge, this seems to indicate to me that the connection is indeed being properly terminated by the client; perhaps this indicates a bug in Puma or Rails.

Yet another update @JamesBoutcher / boutcheratwest(github) pointed me to a discussion on the redis website regarding this issue, specifically in regards to the fact that the .(p)subscribe method never shuts down. The poster on that site pointed out the same thing that we've discovered here, that the Rails environment is never notified when the client-side connection is closed, and therefore is unable to execute the .(p)unsubscribe method. He inquires about a timeout for the .(p)subscribe method, which I think would work as well, though I'm not sure which method (the connection poker I've described above, or his timeout suggestion) would be a better solution. Ideally, for the connection poker solution, I'd like to find a way to determine whether the connection is closed on the other end without writing to the stream. As it is right now, as you can see, I have to implement client-side code to handle my "poking" message separately, which I believe is obtrusive and goofy as heck.

Mneme answered 23/9, 2013 at 23:17 Comment(6)
Also, I know I can use psubscribe to match anything with message.*; I originally had those two message keys as separate functions and just recently split them. Didn't bother to use psubscribe yet, but meh.Mneme
I have the same problem, and have ruled out missing FIN packets (I believe)... I've got Apache sitting in front of the rails server (Puma) in a proxy mode, and can kill Apache -- and the threads in Puma don't die.Steed
Solution doesn't scale very well with multiple users... You only need one extra thread for the generation of those flusher_test messages, not one per user, right?Steed
@JamesBoutcher No, that's correct, it really should not be one-per-user. I did it like this mainly to test the concept, but I wrote in the "Other Note" section below the solution that it should actually be implemented as a single thread, so that it behaves like a garbage collector (for threads). I haven't refactored the solution in that manner yet, but when I do I'll post the update. Huetsch on github said he did it with a cron job, which I think would work too. I assume his cron makes a periodic request to the Rails app which sends the $redis.publish message, or something to that effect.Mneme
@PaulRichter did you ever figure out an elegant solution to this?Wolbrom
@Wolbrom Unfortunately no, I never did. So far that I can tell the issue remains on-going. The heartbeat-style solution appears to be the only arguably acceptable solution. But admittedly I haven't looked in to this issue for a long time, as I'm no longer at the company in which I did this work. Maybe somebody, somewhere has found a solution. The github issue is still open and unresolved though, so I'm skeptical.Mneme
S
15

A solution I just did (borrowing a lot from @teeg) which seems to work okay (haven't failure tested it, tho)

config/initializers/redis.rb

$redis = Redis.new(:host => "xxxx.com", :port => 6379)

heartbeat_thread = Thread.new do
  while true
    $redis.publish("heartbeat","thump")
    sleep 30.seconds
  end
end

at_exit do
  # not sure this is needed, but just in case
  heartbeat_thread.kill
  $redis.quit
end

And then in my controller:

def events
    response.headers["Content-Type"] = "text/event-stream"
    redis = Redis.new(:host => "xxxxxxx.com", :port => 6379)
    logger.info "New stream starting, connecting to redis"
    redis.subscribe(['parse.new','heartbeat']) do |on|
      on.message do |event, data|
        if event == 'parse.new'
          response.stream.write("event: parse\ndata: #{data}\n\n")
        elsif event == 'heartbeat'
          response.stream.write("event: heartbeat\ndata: heartbeat\n\n")
        end
      end
    end
  rescue IOError
    logger.info "Stream closed"
  ensure
    logger.info "Stopping stream thread"
    redis.quit
    response.stream.close
  end
Steed answered 21/10, 2013 at 2:27 Comment(2)
what if client gets "hearbeat" event before the "parse.new" event?Gremial
why do a heartbeat and send a ping to thousands of clients at a fixed interval? Sorry, but any solution without a heartbeat would be preferable.Sleeping
J
5

I'm currently making an app that revolves around ActionController:Live, EventSource and Puma and for those that are encountering problems closing streams and such, instead of rescuing an IOError, in Rails 4.2 you need to rescue ClientDisconnected. Example:

def stream
  #Begin is not required
  twitter_client = Twitter::Streaming::Client.new(config_params) do |obj|
    # Do something
  end
rescue ClientDisconnected
  # Do something when disconnected
ensure
  # Do something else to ensure the stream is closed
end

I found this handy tip from this forum post (all the way at the bottom): http://railscasts.com/episodes/401-actioncontroller-live?view=comments

Jaguarundi answered 6/2, 2015 at 0:37 Comment(3)
Wow, that's promising. I can't test this at the moment, but are you saying that this rescue block is fired whenever the client is disconnected, without the need of any kind of "heartbeat" thread? I found the portion of code in the Rails source that raises this exception, and unless I'm misunderstanding how it works, it seems to only raise that exception when it attempts to write, so I'm a little unclear.Mneme
More than a month later... I think you are right that it raises the exception only when it tries to write, but in my case I was streaming about 50 tweets per second to a connected client, so the controller disconnected at basically the same time as the client. I ended up switching to use Websocket-Rails and after a long battle, got it to work beautifully.Jaguarundi
Alright fair enough. Thanks for the update. If/when I get back into this project I'll consider checkout out websocket-rails.Mneme
E
2

Here's a potentially simpler solution which does not use a heartbeat. After much research and experimentation, here's the code I'm using with sinatra + sinatra sse gem (which should be easily adapted to Rails 4):

class EventServer < Sinatra::Base
 include Sinatra::SSE
 set :connections, []
 .
 .
 .
 get '/channel/:channel' do
 .
 .
 .
  sse_stream do |out|
    settings.connections << out
    out.callback {
      puts 'Client disconnected from sse';
      settings.connections.delete(out);
    }
  redis.subscribe(channel) do |on|
      on.subscribe do |channel, subscriptions|
        puts "Subscribed to redis ##{channel}\n"
      end
      on.message do |channel, message|
        puts "Message from redis ##{channel}: #{message}\n"
        message = JSON.parse(message)
        .
        .
        .
        if settings.connections.include?(out)
          out.push(message)
        else
          puts 'closing orphaned redis connection'
          redis.unsubscribe
        end
      end
    end
  end
end

The redis connection blocks on.message and only accepts (p)subscribe/(p)unsubscribe commands. Once you unsubscribe, the redis connection is no longer blocked and can be released by the web server object which was instantiated by the initial sse request. It automatically clears when you receive a message on redis and sse connection to the browser no longer exists in the collection array.

Eldoria answered 9/4, 2014 at 19:4 Comment(0)
G
2

Building on @James Boutcher, I used the following in clustered Puma with 2 workers, so that I have only 1 thread created for the heartbeat in config/initializers/redis.rb:

config/puma.rb

on_worker_boot do |index|
  puts "worker nb #{index.to_s} booting"
  create_heartbeat if index.to_i==0
end

def create_heartbeat
  puts "creating heartbeat"
  $redis||=Redis.new
  heartbeat = Thread.new do
    ActiveRecord::Base.connection_pool.release_connection
    begin
      while true
        hash={event: "heartbeat",data: "heartbeat"}
        $redis.publish("heartbeat",hash.to_json)
        sleep 20.seconds
      end
    ensure
      #no db connection anyway
    end
  end
end
Guthrun answered 24/12, 2014 at 0:37 Comment(0)
C
2

Here you are solution with timeout that will exit blocking Redis.(p)subscribe call and kill unused connection tread.

class Stream::FixedController < StreamController
  def events
    # Rails reserve a db connection from connection pool for
    # each request, lets put it back into connection pool.
    ActiveRecord::Base.clear_active_connections!

    # Last time of any (except heartbeat) activity on stream
    # it mean last time of any message was send from server to client
    # or time of setting new connection
    @last_active = Time.zone.now

    # Redis (p)subscribe is blocking request so we need do some trick
    # to prevent it freeze request forever.
    redis.psubscribe("messages:*", 'heartbeat') do |on|
      on.pmessage do |pattern, event, data|
        # capture heartbeat from Redis pub/sub
        if event == 'heartbeat'
          # calculate idle time (in secounds) for this stream connection
          idle_time = (Time.zone.now - @last_active).to_i

          # Now we need to relase connection with Redis.(p)subscribe
          # chanel to allow go of any Exception (like connection closed)
          if idle_time > 4.minutes
            # unsubscribe from Redis because of idle time was to long
            # that's all - fix in (almost)one line :)
            redis.punsubscribe
          end
        else
          # save time of this (last) activity
          @last_active = Time.zone.now
        end
        # write to stream - even heartbeat - it's sometimes chance to
        # capture dissconection error before idle_time
        response.stream.write("event: #{event}\ndata: #{data}\n\n")
      end
    end
    # blicking end (no chance to get below this line without unsubscribe)
  rescue IOError
    Logs::Stream.info "Stream closed"
  rescue ClientDisconnected
    Logs::Stream.info "ClientDisconnected"
  rescue ActionController::Live::ClientDisconnected
    Logs::Stream.info "Live::ClientDisconnected"
  ensure
    Logs::Stream.info "Stream ensure close"
    redis.quit
    response.stream.close
  end
end

You have to use reds.(p)unsubscribe to end this blocking call. No exception can break this.

My simple app with information about this fix: https://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix

Commination answered 13/11, 2015 at 7:26 Comment(0)
S
1

Instead of sending a heartbeat to all the clients, it might be easier to just set a watchdog for each connection. [Thanks to @NeilJewers]

class Stream::FixedController < StreamController
  def events
    # Rails reserve a db connection from connection pool for
    # each request, lets put it back into connection pool.
    ActiveRecord::Base.clear_active_connections!

    redis = Redis.new

    watchdog = Doberman::WatchDog.new(:timeout => 20.seconds)
    watchdog.start

    # Redis (p)subscribe is blocking request so we need do some trick
    # to prevent it freeze request forever.
    redis.psubscribe("messages:*") do |on|
      on.pmessage do |pattern, event, data|
        begin
          # write to stream - even heartbeat - it's sometimes chance to
          response.stream.write("event: #{event}\ndata: #{data}\n\n")
          watchdog.ping

        rescue Doberman::WatchDog::Timeout => e
          raise ClientDisconnected if response.stream.closed?
          watchdog.ping
        end
      end
    end

  rescue IOError
  rescue ClientDisconnected

  ensure
    response.stream.close
    redis.quit
    watchdog.stop
  end
end
Sleeping answered 28/10, 2017 at 22:46 Comment(0)
P
1

If you can tolerate a small chance of missing a message you can use subscribe_with_timeout:

sse = SSE.new(response.stream)
sse.write("hi", event: "hello")
redis = Redis.new(reconnect_attempts: 0)
loop do
  begin
    redis.subscribe_with_timeout(5 * 60, 'mycoolchannel') do |on|
      on.message do |channel, message|
        sse.write(message, event: 'message_posted')
      end
    end
  rescue Redis::TimeoutError
    sse.write("ping", event: "ping")
  end
end

This code subscribes to a Redis channel, waits for 5 minutes, then closes connection to Redis and subscribes again.

Psychognosis answered 4/10, 2021 at 21:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.