Nim inter-thread message passing: How to avoid a global TChannel?
Asked Answered
L

1

10

I have the following simple example of an inter-thread communication problem: I want to run arbitrary "anytime" algorithms in a background thread. An anytime algorithm performs some computation of result type T incrementally, i.e., it sporadically produces newer, more precise results. In Nim parlance, they are probably best represented by an iterator. In the main thread, I now want to wrap such iterators each in its own thread, with the possibility to query the threads for things like "is there a new value available" or "what is the current computation result".

Since I'm not familiar with Nim's concurrency concepts I have trouble to implement the required inter-thread communication. My idea was to use a TChannel for the communication. According to this forum post, a TChannel cannot be used in combination with spawn but requires to use createThread. I managed to get the following to compile and run:

import os, threadpool

proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =

  type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]

  # I think I have to wrap the iterator to pass it to createThread
  proc threadFunc(args: Args) {.thread.} =
    echo "Thread is starting"
    let iter = args.iter
    var channel = args.channel[]

    for i in iter():
      echo "Sending ", i
      channel.send(i)

  var thread: TThread[Args]
  var channel: TChannel[T]
  channel.open()

  let args = (f, channel.addr)
  createThread(thread, threadFunc, args)

  result = channel


# example use in some main thread:
iterator test(): int {.closure.} =
  sleep(500)
  yield 1
  sleep(500)
  yield 2

var channel = spawnBackgroundJob[int](test)

for i in 0 .. 10:
  sleep(200)
  echo channel.peek()

echo "Finished"

Unfortunately, this does not have the expected behavior, i.e., I never receive anything in the main thread. I was told on IRC that the problem is that I do not use global variables. But even after a long time thinking I neither do see exactly why this fails, nor if there is a way to solve it. The problem is that I cannot simply make the variables thread and channel global, since they depend on the type T. I also want to avoid restricting this to only run a single anytime algorithm (or some other fixed number N). I was also told that the approach does not really make sense overall, so maybe I'm just missing that this problem has an entirely different solution?

Luminiferous answered 29/4, 2015 at 18:46 Comment(0)
Q
4

Reason:

You're using two different channels in send and recv.

object assign in Nim is deep copy, they're different object.

var channel = args.channel[]

and

result = channel

To explain it, see code snippet below:

type
  A = object
    x: int
    y: int

var a,b: A

var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation

a = c[]
b = c[]

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000

a.x = 1
a.y = 2

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000

b.x = 3
b.y = 4

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400

Solution to pass channel in and out proc:

To pass Channel as parameter and return value, please refer to Jehan's Answer in Nim forum.

paste Jehan's Answer here for quick reference, and make it compile pass in Nim 0.11.2

type SharedChannel[T] = ptr TChannel[T]

proc newSharedChannel[T](): SharedChannel[T] =
  result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
  open(result[])

proc close[T](ch: var SharedChannel[T]) =
  close(ch[])
  deallocShared(ch)
  ch = nil

proc send[T](ch: SharedChannel[T], content: T) =
  ch[].send(content)


proc recv[T](ch: SharedChannel[T]): T =
  result = ch[].recv

proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
  let (mainChannel, responseChannel) = ch
  while true:
    let s = mainChannel.recv
    if s == nil:
      break
    echo s
    responseChannel.send(true)
  responseChannel.send(false)

proc main() =
  var
    mainChannel = newSharedChannel[string]()
    responseChannel = newSharedChannel[bool]()
    th: TThread[(SharedChannel[string], SharedChannel[bool])]
  createThread(th, someThread, (mainChannel, responseChannel))
  for i in 0..2:
    echo("main thread send: " & $i)
    mainChannel.send($i)
    if not responseChannel.recv:
      break
  mainChannel.send(nil)
  joinThread(th)
  close(mainChannel)
  close(responseChannel)

main()

Output:

main thread send: 0
0
main thread send: 1
1
main thread send: 2
2

One more step, solution to this question:

import os, threadpool, macros

template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
  block:
    proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
      echo "Thread is starting"

      for i in iter:
        echo "Sending ", i
        channel[].send(i)

    channel[].open()

    var thread: TThread[ptr TChannel[t]]
    createThread(thread, threadFunc, chan)
    #joinThread(thread)


# example use in some main thread:
iterator testJob(): int =
  yield 0
  sleep(500)
  yield 1
  sleep(500)
  yield 2

var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())

for i in 1 .. 10:
  sleep(200)
  echo channel[].peek()

channel[].close()
Quality answered 28/8, 2015 at 7:18 Comment(5)
I'm not sure if that is the only problem here. At least, according to this answer, I would guess that this should work. I also wasn't able to use Jehan's answer to solve my actual problem of running an iterator in a background thread.Luminiferous
@Luminiferous Jehan's answer is for Nim 0.11.3, rename TThread to Thread and TChannel to Channel. I paste a version for Nim 0.11.2 here, which should work.Quality
The problem is that I could not get it to work with closure iterators. Since my anytime algorithms are closures I cannot simply put them into a global proc someThread like in this example.Luminiferous
@Luminiferous Here is a walk around for your case.Quality
Nice, this seems to work just fine, thanks a lot! Would you mind to put this code into your answer instead of Jehans example? This would be more on-topic, and would provide everything to fully answer the question.Luminiferous

© 2022 - 2024 — McMap. All rights reserved.