Python stop multiple process when one returns a result?
Asked Answered
B

4

15

I am trying to write a simple proof-of-work nonce-finder in python.

def proof_of_work(b, nBytes):
    nonce = 0
    # while the first nBytes of hash(b + nonce) are not 0
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + 1
    return nonce

Now I am trying to do this multiprocessed, so it can use all CPU cores and find the nonce faster. My idea is to use multiprocessing.Pool and execute the function proof_of_work multiple times, passing two params num_of_cpus_running and this_cpu_id like so:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
    return nonce

So, if there are 4 cores, every one will calculate nonces like this:

core 0: 0, 4, 8, 16, 32 ...
core 1: 1, 5, 9, 17, 33 ...
core 2: 2, 6, 10, 18, 34 ...
core 3: 3, 7, 15, 31, 38 ...

So, I have to rewrite proof_of_work so when anyone of the processes finds a nonce, everyone else stops looking for nonces, taking into account that the found nonce has to be the lowest value possible for which the required bytes are 0. If a CPU speeds up for some reason, and returns a valid nonce higher than the lowest valid nonce, then the proof of work is not valid.

The only thing I don't know how to do is the part in which a process A will only stop if process B found a nonce that is lower than the nonce that is being calculated right now by process A. If its higher, A keeps calculating (just in case) until it arrives to the nonce provided by B.

I hope I explained myself correctly. Also, if there is a faster implementation of anything I wrote, I would love to hear about it. Thank you very much!

Brewmaster answered 12/9, 2015 at 10:56 Comment(3)
@MaartenBodewes I am afraid I don't understand your point, maybe i am still too nebie programmer xD. The cpu_id is just a number I assign to every process. First I get the number of cpu cores x = multiprocessing.cpu_count() and then start x processes, each with a different ID that just increments by 1. Hope I got that correct.Brewmaster
Sorry, that was just a misunderstanding by me. I've never looked into these proof of work protocols deeply. I despise things like bitcoin - I don't like protocols that convert energy in money, I prefer it the other way around.Pratte
@MaartenBodewes proof of work is not only used in bitcoin. I use it to prevent DDoS attacks.Spunk
U
8

One easy option is to use micro-batches and check if an answer was found. Too small batches incur overhead from starting parallel jobs, too large size causes other processes to do extra work while one process already found an answer. Each batch should take 1 - 10 seconds to be efficient.

Sample code:

from multiprocessing import Pool
from hashlib import sha256
from time import time


def find_solution(args):
    salt, nBytes, nonce_range = args
    target = '0' * nBytes

    for nonce in xrange(nonce_range[0], nonce_range[1]):
        result = sha256(salt + str(nonce)).hexdigest()

        #print('%s %s vs %s' % (result, result[:nBytes], target)); sleep(0.1)

        if result[:nBytes] == target:
            return (nonce, result)

    return None


def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    pool = Pool(n_processes)

    nonce = 0

    while True:
        nonce_ranges = [
            (nonce + i * batch_size, nonce + (i+1) * batch_size)
            for i in range(n_processes)
        ]

        params = [
            (salt, nBytes, nonce_range) for nonce_range in nonce_ranges
        ]

        # Single-process search:
        #solutions = map(find_solution, params)

        # Multi-process search:
        solutions = pool.map(find_solution, params)

        print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

        # Find non-None results
        solutions = filter(None, solutions)

        if solutions:
            return solutions

        nonce += n_processes * batch_size


if __name__ == '__main__':
    start = time()
    solutions = proof_of_work('abc', 6)
    print('\n'.join('%d => %s' % s for s in solutions))
    print('Solution found in %.3f seconds' % (time() - start))

Output (a laptop with Core i7):

Searched 0 to 1999999
Searched 2000000 to 3999999
Searched 4000000 to 5999999
Searched 6000000 to 7999999
Searched 8000000 to 9999999
Searched 10000000 to 11999999
Searched 12000000 to 13999999
Searched 14000000 to 15999999
Searched 16000000 to 17999999
Searched 18000000 to 19999999
Searched 20000000 to 21999999
Searched 22000000 to 23999999
Searched 24000000 to 25999999
Searched 26000000 to 27999999
Searched 28000000 to 29999999
Searched 30000000 to 31999999
Searched 32000000 to 33999999
Searched 34000000 to 35999999
Searched 36000000 to 37999999
37196346 => 000000f4c9aee9d427dc94316fd49192a07f1aeca52f6b7c3bb76be10c5adf4d
Solution found in 20.536 seconds

With single core it took 76.468 seconds. Anyway this isn't by far the most efficient way to find a solution but it works. For example if the salt is long then the SHA-256 state could be pre-computed after the salt has been absorbed and continue brute-force search from there. Also byte array could be more efficient than the hexdigest().

Until answered 8/10, 2015 at 12:45 Comment(2)
Excellent show of parallelism. However, does it answer the OP's question of how to stop other threads once one thread has found an answer?Heterodyne
Well true it doesn't 100% answer the original question but solves the problem by knowing that at worst other threads do a few seconds of extra work.Until
P
6

A general method to do this is to:

  1. think of work packets, e.g. to perform the calculation for a particular range, a range should not take long, say 0.1 seconds to a second
  2. have some manager distribute the work packets to the worker
  3. after a work packet has been concluded, tell the manager the result and request a new work packet
  4. if the work is done and a result has been found accept the results from workers and give them a signal that no more work is to be performed - the workers can now safely terminate

This way you don't have to check with the manager each iteration (which would slow down everything), or do nasty things like stopping a thread mid-session. Needless to say, the manager needs to be thread safe.

This fits perfectly with your model, as you still need the results of the other workers, even if a result has been found.


Note that in your model, it could be that a thread may go out of sync with the other threads, lagging behind. You don't want to do another million calculations once a result is found. I'm just reiterating this from the question because I think the model is wrong. You should fix the model instead of fixing the implementation.

Pratte answered 12/9, 2015 at 13:53 Comment(4)
Novice mistake: give the manager a low priority. As the manager isn't really doing anything most of the time, it should have a priority at least as high as the workers, but preferably higher. Otherwise a worker may have to wait forever for a new packet. And yes, I was that novice once :)Pratte
Note: I'm not an experienced Python dev. If somebody posts a reliable implementation of above in Python, feel free to accept that one.Pratte
Much more flexible too. Add/remove workers, change size of work packet, add option for user to indicate when to stop, etc. etc.Pratte
Thank you very much, this really helped. Will wait if someone comes up with some code, but in the meantime will try to implement this.Brewmaster
C
3

You can use multiprocessing.Queue(). Have a Queue per CPU/process. When a process finds a nonce, it puts it on the queue of other processes. Other processes check their queue (non-blocking) in each iteration of the while loop and if there is anything on it, they decide to continue or terminate based on the value in the queue:

def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id, qSelf, qOthers):
    nonce = this_cpu_id
    while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
        nonce = nonce + num_of_cpus_running
        try:
            otherNonce = qSelf.get(block=False)
            if otherNonce < nonce:
                return
        except:
            pass
    for q in qOthers:
        q.put(nonce)
    return nonce

qOthers is a list of queues ( each queue=multiprocessing.Queue() ) belonging to other processes.

If you decide to use queues as I suggested, you should be able to write a better/nicer implementation of above approach.

Cawley answered 5/10, 2015 at 21:9 Comment(0)
P
0

I like to improve NikoNyrh's answer by changing pool.map to pool.imap_unordered. Using imap_unordered will return the result immediately from any of the workers without waiting for all of them to be completed. So once any of the results returns the tuple, we can exit the while loop.

def proof_of_work(salt, nBytes):
    n_processes = 8
    batch_size = int(2.5e5)
    with Pool(n_processes) as pool:

        nonce = 0

        while True:
            nonce_ranges = [
                (nonce + i * batch_size, nonce + (i+1) * batch_size)
                for i in range(n_processes)
            ]

            params = [
                (salt, nBytes, nonce_range) for nonce_range in nonce_ranges

           ]
            print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))

            for result in pool.imap_unordered(find_solution, params):
                if isinstance(result,tuple): return result
            
            nonce += n_processes * batch_size


Portrait answered 12/2, 2021 at 18:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.