Thread and Queue
Asked Answered
S

6

28

I am interested in knowing what would be the best way to implement a thread based queue.

For example:

I have 10 actions which I want to execute with only 4 threads. I would like to create a queue with all the 10 actions placed linearly and start the first 4 action with 4 threads, once one of the thread is done executing, the next one will start etc - So at a time, the number of thread is either 4 or less than 4.

Sholom answered 2/7, 2011 at 18:25 Comment(0)
L
30

There is a Queue class in thread in the standard library. Using that you can do something like this:

require 'thread'

queue = Queue.new
threads = []

# add work to the queue
queue << work_unit

4.times do
  threads << Thread.new do
    # loop until there are no more things to do
    until queue.empty?
      # pop with the non-blocking flag set, this raises
      # an exception if the queue is empty, in which case
      # work_unit will be set to nil
      work_unit = queue.pop(true) rescue nil
      if work_unit
        # do work
      end
    end
    # when there is no more work, the thread will stop
  end
end

# wait until all threads have completed processing
threads.each { |t| t.join }

The reason I pop with the non-blocking flag is that between the until queue.empty? and the pop another thread may have pop'ed the queue, so unless the non-blocking flag is set we could get stuck at that line forever.

If you're using MRI, the default Ruby interpreter, bear in mind that threads will not be absolutely concurrent. If your work is CPU bound you may just as well run single threaded. If you have some operation that blocks on IO you may get some parallelism, but YMMV. Alternatively, you can use an interpreter that allows full concurrency, such as jRuby or Rubinius.

Liatris answered 2/7, 2011 at 18:47 Comment(2)
In the pickaxe, it suggests having 4 :END_OF_WORK work_units instead of having a non-blocking pop. Also, your last statement about threads not having CPUs run concurrently applies to YARV, but not to JRuby.Gymnastics
@AndrewGrimm, I like this answer because sometimes you want to have a work queue and threads lingering around to do work whenever a new work item is added.Edson
T
7

There area a few gems that implement this pattern for you; parallel, peach,and mine is called threach (or jruby_threach under jruby). It's a drop-in replacement for #each but allows you to specify how many threads to run with, using a SizedQueue underneath to keep things from spiraling out of control.

So...

(1..10).threach(4) {|i| do_my_work(i) }

Not pushing my own stuff; there are plenty of good implementations out there to make things easier.

If you're using JRuby, jruby_threach is a much better implementation -- Java just offers a much richer set of threading primatives and data structures to use.

Torchbearer answered 12/8, 2011 at 1:33 Comment(0)
P
5

Executable descriptive example:

require 'thread'

p tasks = [
    {:file => 'task1'},
    {:file => 'task2'},
    {:file => 'task3'},
    {:file => 'task4'},
    {:file => 'task5'}
]

tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}

# run workers
workers_count = 3
workers = []
workers_count.times do |n|
    workers << Thread.new(n+1) do |my_n|
        while (task = tasks_queue.shift(true) rescue nil) do
            delay = rand(0)
            sleep delay
            task[:result] = "done by worker ##{my_n} (in #{delay})"
            p task
        end
    end
end

# wait for all threads
workers.each(&:join)

# output results
puts "all done"
p tasks
Prodigal answered 20/12, 2012 at 9:2 Comment(0)
S
4

You could use a thread pool. It's a fairly common pattern for this type of problem.
http://en.wikipedia.org/wiki/Thread_pool_pattern

Github seems to have a few implementations you could try out:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool

Stockton answered 2/7, 2011 at 18:39 Comment(0)
C
1

Celluloid have a worker pool example that does this.

Colter answered 3/10, 2014 at 8:11 Comment(0)
U
1

I use a gem called work_queue. Its really practic.

Example:

require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
    wq.enqueue_b("Thread#{number}") do |thread_name|  
        puts "Hello from the #{thread_name}"
    end
end
wq.join
Undaunted answered 5/7, 2016 at 17:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.