How to achieve high concurrency with spray.io in this Future and Thread.sleep example?
Asked Answered
C

1

7

I was trying the following POC to check how to get high concurrency

  implicit def executionContext = context.system.dispatchers.lookup("async-futures-dispatcher")
  implicit val timeout = 10 seconds


  val contestroute = "/contestroute" {
      get {
          respondWithMediaType(`application/json`) {
            dynamic {
                onSuccess(
                  Future {
                    val start = System.currentTimeMillis()
                    // np here should be dealt by 200 threads defined below, so why 
                    // overall time takes so long? why doesn't it really  utilize all
                    // threads I have given to it? how to update the code so it 
                    // utilizes the 200 threads?
                    Thread.sleep(5000) 
                    val status = s"timediff ${System.currentTimeMillis() - start}ms ${Thread.currentThread().getName}"
                    status
                  })  { time =>
                  complete(s"status: $time")
                }
            }
          }
      }
  }

My config:

async-futures-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 200
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 20.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 200
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

however when I run apache bench like this:

ab -n 200 -c 50 http://LAP:8080/contestroute

Results I get are:

Server Software:        Apache-Coyote/1.1
Server Port:erred:      37500 bytes
HTML transferred:       10350 bytes
Requests per second:    4.31 [#/sec] (mean)
Time per request:       34776.278 [ms] (mean)
Time per request:       231.842 [ms] (mean, across all concurrent requests)
Transfer rate:          1.05 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        5  406 1021.3      7    3001
Processing: 30132 30466 390.8  30308   31231
Waiting:    30131 30464 391.8  30306   31231
Total:      30140 30872 998.9  30353   33228            8080

Document Path:          /contestroute
Document Length:        69 bytes

Concurrency Level:      150
Time taken for tests:   34.776 seconds
Complete requests:      150
Failed requests:        0
Write errors:           0
Non-2xx responses:      150
Total transferred:      37500 bytes
HTML transferred:       10350 bytes
Requests per second:    4.31 [#/sec] (mean)
Time per request:       34776.278 [ms] (mean)
Time per request:       231.842 [ms] (mean, across all concurrent requests)
Transfer rate:          1.05 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        5  406 1021.3      7    3001
Processing: 30132 30466 390.8  30308   31231
Waiting:    30131 30464 391.8  30306   31231
Total:      30140 30872 998.9  30353   33228

Am I missing something big? what do I need to change to have my spray and futures utilize all threads i given to it?

(to add i'm running on top of tomcat servlet 3.0)

Cutoff answered 27/10, 2014 at 18:6 Comment(6)
I know it's example code, but why sleep inside of the code that is executing as a Future. By doing that you are basically taking the thread that is running that Future out of commission for 5 seconds and it can not be assigned to another actor/future at all during that 5 seconds.Watchband
i'm simulating a long running operation, so I can see if i can utilize 200 threads to run this long running operation.Cutoff
Are you sure that the underlying actor system for spray is not also picking up that async-futures-dispatcher execution context. If it is inadvertently picking it up (because it's defined as implicit) then that might be part of the problem. It's a good idea to firewall off blocking code into a separate execution context from the akka main dispatcher as you have done here, but it would not serve it's purpose if akka is also using this execution context to run the actors that represent this route.Watchband
I don't think it uses it, but i don't understand even if it uses it i do not see how it could cause this situation after all this is defining 200 threads and the default thread pool for the root system should be similar to number of cores maybe X2 so i don't see why should this have any effect.Cutoff
I want to point out that this whole scenario here is a very specific use case. In general, having 200 threads to do anything is almost always the wrong solution. "Getting high concurrency" in Akka is usually achieved by chopping work down into smaller pieces and distributing this work onto Actors avoiding blocking wherever possible. When you do that properly there's no need for more threads than the number of CPUs/hyperthreads on one machine. This is the way to get the best concurrency with the least resource consumption.Epiphenomenalism
This also depends on what your actual bottleneck is. If your long-running operations are CPU-bound adding more threads will only worsen the situation (more threads battling for the same amount of CPU resources). Similarly, if your load is IO-bound. In some cases, it may be useful to create an extra thread pool if you need to access an external service (like a DB) if you have only a blocking DB driver. In this case, it's still questionable if the DB itself will be able to handle the load in parallel or if it would make sense to have a queue on your side to funnel the DB-requests through.Epiphenomenalism
S
13

In your example all spray operations and blocking operations happen in the same context. You need to split 2 contexts:

Also I don't see the reason to use dynamic, I guess just 'complete' should be good.

 implicit val timeout = 10.seconds

  // Execution Context for blocking ops
  val blockingExecutionContext = {
    ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2000))
  }

  // Execution Context for Spray
  import context.dispatcher

  override def receive: Receive = runRoute(contestroute)

  val contestroute = path("contestroute") {
    get {

        complete {
          Future.apply {
            val start = System.currentTimeMillis()
            // np here should be dealt by 200 threads defined below, so why
            // overall time takes so long? why doesn't it really  utilize all
            // threads I have given to it? how to update the code so it
            // utilizes the 200 threads?
            Thread.sleep(5000)
            val status = s"timediff ${System.currentTimeMillis() - start}ms ${Thread.currentThread().getName}"
            status
          }(blockingExecutionContext)
        }

    }
  }

After that you can test it with

ab -n 200 -c 200 http://LAP:8080/contestroute

and you'll see that spray will create all 200 threads for blocking operations

Results:

Concurrency Level:      200
Time taken for tests:   5.096 seconds
Sextant answered 27/10, 2014 at 22:19 Comment(3)
Small correction: It's not spray that "create[s] all 200 threads" but that's a feature of Scala/Akka.Epiphenomenalism
This is pretty much what I had said in my comment, that spray was also using the context where the blocking was occurring thus severely limiting the throughput of the system.Watchband
Doesn't this approach mean that single thread is still processing all of the routing (=bottleneck + constant danger of blocking all requests)? In order to achieve high concurrency, shouldn't the server hand management of the incoming request to a new thread as soon as possible?Copious

© 2022 - 2024 — McMap. All rights reserved.