Pausing two Python threads while a third one does stuff (with locks?)
Asked Answered
R

5

4

I'm new to concurrent programming.

I'd like to execute three tasks repeatedly. The first two should run all the time, the third should run every hour or so. The first two tasks can run in parallel, but I always want to pause them while the third task is running.

Here's the skeleton of what I've tried:

import threading
import time

flock = threading.Lock()
glock = threading.Lock()

def f():
    while True:
        with flock:
            print 'f'
            time.sleep(1)

def g():
    while True:
        with glock:
            print 'g'
            time.sleep(1)

def h():
    while True:
        with flock:
            with glock:
                print 'h'
        time.sleep(5)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

I would expect this code to print an f and a g every second, and an h about every five seconds. However, when I run it it takes around 12 f's and 12 g's before I start seeing some h's. It's looks like the first two threads constantly release and re-acquire their locks while the third thread is left out of the loop.

  1. Why is that? When the third thread tries to acquire a currently held lock, and it is then released, shouldn't acquisition immediately succeed instead of the first/second thread immediately acquiring it again? I am probably misunderstanding something.
  2. What would be a good way to achieve what I want?

Note: moving the time.sleep(1) calls out of the with flock/glock block works for this simple example, but apparently not for my real application where the threads spend most of their time doing the actual operations. When the first two threads sleep for a second after each execution of the loop body, with the lock released, the third task still never gets executed.

Radiotherapy answered 12/11, 2011 at 10:24 Comment(2)
With your exact code, I see 'h' every 5-6 seconds in both Python2.7 and Python 3.2. I even experimented with removing the sleeps completely, and taking them out of the lock. What OS are you using? (I'm on Windows).Centrum
I tried Linux 2.6.24-1-amd64 #1 SMP x86_64 GNU/Linux with Python 2.7.1 and Linux 2.6.32-5-686 #1 SMP i686 GNU/Linux with Python 2.6.6Radiotherapy
A
5

How about do it with threading.Events:

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(resume,is_waiting,name):
    while True:
        if not resume.is_set():
            is_waiting.set()
            logger.debug('{n} pausing...'.format(n=name))
            resume.wait()
            is_waiting.clear()
        logger.info(name)
        time.sleep(1)

def h(resume,waiters):
    while True:
        logger.debug('halt') 
        resume.clear()
        for i,w in enumerate(waiters):
            logger.debug('{i}: wait for worker to pause'.format(i=i))
            w.wait()
        logger.info('h begin')
        time.sleep(2)
        logger.info('h end')        
        logger.debug('resume')
        resume.set()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

# set means resume; clear means halt
resume = threading.Event()
resume.set()

waiters=[]
for name in 'fg':
    is_waiting=threading.Event()
    waiters.append(is_waiting)
    threading.Thread(target=f,args=(resume,is_waiting,name)).start()    
threading.Thread(target=h,args=(resume,waiters)).start()

yields

[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt

(In response to a question in the comments) This code tries to measure how long it takes for the h-thread to acquire each lock from the other worker threads.

It seems to show that even if h is waiting to acquire a lock, the other worker thread may with fairly high probability release and reacquire the lock. There is no priority given to h just because it has been waiting longer.

David Beazley has presented at PyCon about problems related to threading and the GIL. Here is a pdf of the slides. It is a fascinating read and may help explain this as well.

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(lock,n):
    while True:
        with lock:
            logger.info(n)
            time.sleep(1)

def h(locks):
    while True:
        t=time.time()
        for n,lock in enumerate(locks):
            lock.acquire()
            t2=time.time()
            logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
            t=t2
        t2=time.time()
        logger.info('h {d}'.format(d=t2-t))
        t=t2
        for lock in locks:
            lock.release()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

locks=[]
N=5
for n in range(N):
    lock=threading.Lock()
    locks.append(lock)
    t=threading.Thread(target=f,args=(lock,n))
    t.start()

threading.Thread(target=h,args=(locks,)).start()
Amadis answered 12/11, 2011 at 11:52 Comment(3)
I'd still like to understand why my original code doesn't work as expected. Is it indeed the case that one thread's fresh lock.acquire() may immediately succeed while another thread's lock.acquire() is already blocking?Radiotherapy
My understanding of threads is mainly empirical. I don't think I can explain this on a deep level. I'm posting some code which I think may shed a little light on what is going on. For h-thread to acquire say flock, the h-thread must acquire the GIL and at a time when the f-thread has released flock. The mechanics of how that happens is not something I can explain in detail. But the code (I'll post above) shows it doesn't happen with the frequency desired.Amadis
Thinking about my question further, yes, of course that may happen - after all, there's no guarantees as to the order in which requests to acquire a lock are satisfied. What I was actually assuming was that when a lock is released in a thread T and some requests to acquire the lock are currently blocking, at least one of these requests will be satisfied before anything else happens in T. But apparently, this is not guaranteed either.Radiotherapy
M
1

The simplest way to do this is with 3 Python processes. If you are doing this on Linux, then the hourly process can send a signal to cause the other tasks to pause, or you could even kill them and then restart after the hourly task is complete. No need for threads.

However, if you are determined to use threads, then try to share NO data whatsoever between threads, just send messages back and forth (also know as data copying rather than data sharing). Threading is hard to get right.

But, multiple processes forces you to share nothing, and is therefore much easier to do correctly. If you use a library like 0MQ http://www.zeromq.org to do your message passing, then it is easy to move from a threading model to a multiple process model.

Merrilee answered 12/11, 2011 at 10:37 Comment(2)
I'm afraid my threads do need to share some data, that's why I decided to use threads and not processes in the first place.Radiotherapy
Are you sure that you can't share the data needed in the moment by some other means? One way is to send a copy in a message, but another way that I have used is to put the shared data in a memcache. This is a case where restructuring a program makes it easier to write reliable high performance code.Merrilee
C
1

Using communication for synchronization:

#!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue

def f(q, c):
    while True:
        try: q.get_nowait(); q.get() # get PAUSE signal      
        except Empty: pass  # no signal, do our thing
        else: q.get()       # block until RESUME signal
        print c,
        time.sleep(1)

def h(queues):
    while True:
        for q in queues:
            q.put_nowait(1); q.put(1) # block until PAUSE received
        print 'h'
        for q in queues:
            q.put(1) # put RESUME
        time.sleep(5)

queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()

It might be not-optimal from a performance point of you but I find it much easier to follow.

Output

f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
Cachalot answered 12/11, 2011 at 13:16 Comment(2)
Doesn't this risk concurrent execution of print c, and print 'h'? Like, when the H thread puts the pause signal into the queue and then proceeds to do its work, the other two threads might still be working, right? That's what I want to avoid.Radiotherapy
@ke.: yes. It is a possibility. You said that 'h' threads starts once an hour. I allowed the threads to overlap for one cycle. (Queue() can't have maxsize=0 due to it means it is unlimited. So you need to put() twice to block a Queue with maxsize=1. I've updated the answer.Cachalot
J
0

How about a semaphore initialized to 2? F and G wait and signal one unit, H waits and signals 2 units.

Jaquelinejaquelyn answered 12/11, 2011 at 11:12 Comment(4)
You mean like in the above example, but replace all references to flock and glock with references to a single semaphore object, constructed using threading.Semaphore(2)? I'm afraid the resulting program exhibits the same problem.Radiotherapy
Hmm.. I was expecting improved performance because H is waiting for either release, not just one at a time. I would then try lowering the priority of F,G or raising H so that, when a unit becomes free, H runs and acquires the unit instead of F or G. H should wait for its 2 units in two loops, ie. two calls for one unit, not once call for two units.Jaquelinejaquelyn
There's no simple way to set thread priorities in Python, is there?Radiotherapy
Also, I'm not sure what you mean by "wait for its 2 units in two loops". What I tried is this: with semaphore: with semaphore: print 'h' Is this what you mean?Radiotherapy
A
0

What about this approach (although contentious because I know 'global' variables are supposedly a big no-no when it comes to threading (newbie-so still learning)...

import threading, time


import threading, time

def f():
    global BL
    while True:
        BL = 'f' if BL == -1 else BL
        if BL == 'f':
            print('f')
            BL = -1
            ss(0.1)

def g():
    global BL
    while True:
        BL = 'g' if BL == -1 else BL
        if BL == 'g':
            print('g')
            BL = -1
            ss(0.1)

def h():
    global BL
    while True:
        BL = 'h' if BL == -1 and (tt() - start) % delay_3rd <= 0.1 and (tt()-start) > 1 else BL
        if (BL == 'h'):
           print('h')
           print(f' seconds: {round(tt() - start,None)}!!'*100)
           BL = -1
           ss(0.1)


BL, delay_3rd, [ss], [tt]  = -1, 5, [time.sleep], [time.time]
start = tt()

3rd one will run every second (you could make delay_3rd = 3600 for hourly intervals; whereas 1st two run always (per your request/intention)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

(output after about 4-5 secs of running...)

f

h seconds: 5!!

g

f

g

f

f

g

f

h

g

f

g

h seconds: 6!!

f

g

f

g

f

g

f

g

f

g

f

g

f

h

seconds: 7!!

g

f

g

(notice h only appears every second; f & g appear intermittently throughout...)

Accelerate answered 28/1, 2021 at 5:44 Comment(1)
(i.e. so closest 'code' to your original submission - replaces the flock/glock with a global variables, and that's about it really.. oh yes, and I include an 'if statement' i.e. BL = 'f/g/h' if BL == -1 else BL....Accelerate

© 2022 - 2024 — McMap. All rights reserved.