Quickly adding multiple items (1000/sec) to a sidekiq queue?
Asked Answered
P

3

10

I realize there is a push_bulk option for sidekiq but I'm currently being limited by latency to redis, so passing multiple items via push_bulk still isn't going quickly enough (only about 50/s).

I've tried to increase the number of redis connections like so:

redis_conn = proc {
  Redis.new({ :url => Rails.configuration.redis.url })
}

Sidekiq.configure_client do |config|
  Sidekiq.configure_client do |config|
    config.redis = ConnectionPool.new(size: 50, &redis_conn)
  end
  config.client_middleware do |chain|
    chain.add Sidekiq::Status::ClientMiddleware
  end
end

And then fire off separate threads (Thread.new) to actually perform_async on the various objects. What is interesting is any thread that isn't the first thread NEVER gets thrown into the sidekiq queue, it's like they're ignored entirely.

Does anyone know of a better way to do this?

Edit: Here is the push_bulk method I was trying which is actually slower:

  user_ids = User.need_scraping.pluck(:id)
  bar = ProgressBar.new(user_ids.count)
  user_ids.in_groups_of(10000, false).each do |user_id_group|
    Sidekiq::Client.push_bulk(
      'args'  => user_id_group.map{ |user_id| [user_id] },
      'class' => ScrapeUser,
      'queue' => 'scrape_user',
      'retry' => true
    )
  end

Thanks!

Pomander answered 18/12, 2013 at 21:25 Comment(0)
E
18

You DO want to use push_bulk. You're limited by the latency/round-trip time to write elements to the redis queue backing sidekiq.

You're using multiple threads/connections to overcome a slow network, when you should really be removing extra network roundtrips.

Assuming you're trying to enqueuue 20k UserWorker jobs that take a user_id:

You would enqueue a single job via:

UserWorker.perform_async(user_id)

... which maps to:

Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )

So the push_bulk version for 20k user_ids is:

# This example takes 20k user_ids in an array, chunks them into groups of 1000 ids,
# and batch sends them to redis as a group.

User.need_scraping.select('id').find_in_batches do |user_group|

  sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } }
  Sidekiq::Client.push_bulk(sidekiq_items)
end

This turns 20k redis calls into 20 redis calls, with an average round trip time of 5ms (optimistic), that's 1sec vs. 100 seconds. Your mileage may vary.

EDIT: Commenters seem confused about the behavior of the Sidekiq/Redis client for bulk enqueuing data.

The Sidekiq::Client.push_bulk() method takes an array of jobs to be enqueud. It translates these into Sidekiq job payload hashes, and then calls SideKiq::Client.raw_push() to deliver these payloads to redis. See source: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158

SideKiq::Client.raw_push() takes a list of Sidekiq hash payloads, converts them to JSON, and then executes a redis MULTI command combining two redis commands. First, it adds to targeted queue to the list of active queues (redis SADD), then it pushes all of the job payloads to the targeted queue redis list object (redis LPUSH). This is a single redis command, executed together in a single redis atomic group.

If this is still slow, you likely have other problems (slow network, overloaded redis server, etc).

Essa answered 19/12, 2013 at 21:31 Comment(6)
This is exactly what I was trying, and it's actually slower than just looping through each record individually. I added my code above.Pomander
If that code is slow, it's because you're doing something more fundamentally wrong, like loading the User model for each user record in your db. Try dropping your batch size down to 100-1000. Code updated. user_ids = User.need_scraping.select('id').find_in_batches do |user_ids|Essa
When looking at the source of sidekiq, it seems that is still actually sends each item to redis individually. I don't see a performance gain at all with push_bulk :/ Related: github.com/mperham/sidekiq/issues/1410Pomander
You're mistaken that Sidekiq's bulk_push doesn't batch jobs. Adding more detail in the answer to explain.Essa
Small detail: the method is called push_bulk, not bulk_push. :) Also, looking at the source, I don't think 'class' => UserWorker should be repeated for each job, only the args, like this: Sidekiq::Client.push_bulk('class' => UserWorker, 'args' => user_group.map(&:id)). Relevant code here: github.com/mperham/sidekiq/blob/master/lib/sidekiq/…Japonica
In case it's useful to others, I've written a small gem that makes push_bulk available on job/worker classes directly: github.com/aprescott/sidekiq-bulkKilo
I
8

@Winfield's answer is correct, and he's absolutely right about latency. However, the correct syntax is actually as follows:

User.need_scraping.select('id').find_in_batches do |user_group|
  Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } })
end

Maybe it changed in the most recent Sidekiq (I was too lazy to check), but this is the correct syntax now.

Intelligible answered 5/5, 2016 at 16:27 Comment(0)
H
1

Following on from @michael-y's answer, there now is a perform_bulk method that you can call on your sidekiq job class.

Here is an sample, building on the example on this page:

User.need_scraping.select('id').find_in_batches do |user_group|
  args = user_group.map { |user| [user.id] } # must be an array of arrays
  UserWorker.perform_bulk(args)
end

See also:

Hardpressed answered 3/11, 2023 at 11:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.