PHP pthreads - shared objects
Asked Answered
S

1

5

Im searching a safe and fast way to use a shared object.

I asked the question already here: https://github.com/krakjoe/pthreads/issues/470 but obviuously this wasnt the right place.

Trying to share an object (Threaded) with many other contextes (Thread). All threads are updating this shard object -- they can set own requests and have to respond to requests from others also.

Now that krakjoe responded that lock/unlock wont be available in 7 i got a problem.

I know about :.synchronized but have no idea how to use it to get it working for my needs.

How can i use ::synchronized to write methods like

  • lock()
  • unlock()
  • is_locked() -- check if locked, and if so dont try - just go on and try later

EDIT:

i wrote a (imo) very easy test script.

this script includes no syc/lock/... methods atm.

it should just show what im trying to do.

im still searching a way to use :: to make this shared safe.

code:

<?php
/*
TEST:
    create n threads
    each will
        - Shared::set() its own ref
        - check if Shared::exists() its own ref
        - Shared::get() its ref back
        - call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context

TODO:
    using ::synchronized to handle multi-context-access

NOTES:
    every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour"
        see: "Method Modifiers - Special Behaviour"
            at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html
*/
class Shared extends Threaded
{
    public $data;
    public function exists($ident)
    {
        return isset($this->data[$ident]);
    }
    public function set($ident, $ref)
    {
        $return = false;
        if(!isset($this->data[$ident])){
            $data = $this->data;
            $data[$ident] = $ref;
            $this->data = $data;
            $return = $this->data[$ident];
        }
        #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
        return $return;
    }
    public function get($ident)
    {
        $return = false;
        if($this->exists($ident) === true){
            $data = $this->data;
            $return = $data[$ident];
            unset($data[$ident]);
            $this->data = $data;
        }
        #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
        return $return;
    }
}

class T extends Thread
{
    public $count;
    public function __construct(Shared $Shared, $ident)
    {
        $this->Shared = $Shared;
        $this->ident = $ident;
    }
    public function run()
    {
        $slowdown = true;
        $this->count = 0;
        while(true){
            if($slowdown){
                // "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8
                //  loop a bit to simulate work:
                $start = microtime(true);
                $until = rand(1, 100000)/1000000;
                while(microtime(true)-$start < $until){
                    // ...
                }
            }

            if($this->Shared->exists($this->ident) === true){
                $ref = $this->Shared->get($this->ident);
            }
            else{
                $ref = $this->Shared->set($this->ident, $this);
            }
            // calling a method on $ref -- if not a ref we crash
            $ref->isRunning();
            unset($ref);
            $this->count++;
        }
    }
}


echo 'start ...' . PHP_EOL;

$n = 8;
$Shared = new Shared();
for($i = 0, $refs = array(); $i < $n; $i++){
    $refs[$i] = new T($Shared, $i);
    $refs[$i]->start();
}

while(!empty($refs)){
    // print status:
    if(!isset($t)or microtime(true)-$t > 1){
        $t = microtime(true);
        echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL;
    }

    // join crashed threads:
    foreach($refs as $i => $thread){
        if($thread->isRunning() === false){
            echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL;
            if($thread->isJoined() === false){
                $thread->join();
            }
            unset($refs[$i]);
        }
    }
}

echo 'no thread running anymore.' . PHP_EOL;

/* output
start ...
status: 8 running atm ...

Notice: Undefined offset: 6 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-6 stopped after 10
status: 7 running atm ...

Notice: Undefined offset: 4 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-4 stopped after 35
status: 6 running atm ...

Notice: Undefined offset: 7 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-7 stopped after 43
status: 5 running atm ...
status: 5 running atm ...
status: 5 running atm ...

[...]
*/
?>
Strangeness answered 1/9, 2015 at 22:17 Comment(1)
this is a great questionDriscoll
M
8

Threaded objects are already thread safe, that is to say that, any time you read, write, check for the existence of, or delete (unset) a member, the operation is atomic - no other context can perform any of the aforementioned operations while the first operation takes place. The same is true for engine handlers that the user is unaware of, everything down to the lowest level is implicitly safe.

Quite re-assuring, however ... This has obvious limits when the logic gets more complex, such as checking the existence of a member before setting or doing something else with it, as you are doing: While the operations on the object are atomic, there is nothing to stop another context unseting a member between the call to isset and the call to read the property/dimension.

This applies to PHP7 (pthreads v3+)

Safety, and integrity are two different things here. When integrity is important you can use Threaded::synchronized in PHP7 to preserve it correctly. In PHP5 you could preserve it also, but the code would be more complicated, as would the explanation.

Your second example should run indefinitely, if I understand it's logic. So I'm using that assumption to construct the correct code, I'm going to make further assumptions about what you might want to do in this endless loop and provide some insight where it seems required.

<?php
class Referee extends Threaded {

    public function find(string $ident, Threaded $reference) {
        return $this->synchronized(function () use($ident, $reference) {
            if (isset($this[$ident])) {
                return $this[$ident];
            } else return ($this[$ident] = $reference);
        });
    }

    public function foreach(Closure $closure) {
        $this->synchronized(function() use($closure) {
            foreach ($this as $ident => $reference) {
                $closure($ident, $reference);
            }
        });
    }
}

class Test extends Thread {

    public function __construct(Referee $referee, string $ident, bool $delay) {
        $this->referee = $referee;
        $this->ident   = $ident;
        $this->delay   = $delay;
    }

    public function run() {
        while (1) {
            if ($this->delay) {
                $this->synchronized(function(){
                    $this->wait(1000000);
                });
            }

            $reference = 
                $this->referee->find($this->ident, $this);

            /* do something with reference here, I guess */         

            /* do something with all references here */
            $this->referee->foreach(function($ident, $reference){
                var_dump(Thread::getCurrentThreadId(),
                        $reference->getIdent(), 
                        $reference->isRunning());
            });
        }
    }

    public function getIdent() {
        return $this->ident;
    }

    private $referee;
    private $ident;
    private $delay;
}

$referee = new Referee();
$threads = [];
$thread = 0;
$idents = [
    "smelly",
    "dopey",
    "bashful",
    "grumpy",
    "sneezy",
    "sleepy",
    "happy",
    "naughty"
];

while ($thread < 8) {
    $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
    $threads[$thread]->start();
    $thread++;
}

foreach ($threads as $thread)
    $thread->join();
?>

So we'll look at the differences, I'll tell you why they are as they are and how else you might write them, you already know that we're not talking about safety now, but integrity, you are afforded the (quite remarkable) assumption that anything you write is "safe", as explained.

The first major difference, is this:

if ($this->delay) {
    $this->synchronized(function(){
        $this->wait(1000000);
    });
}

This is simply a suitable way to make a Thread wait, you wouldn't have to use the Thread itself to synchronize, you could use any Threaded object. The benefit of doing things properly, in case not clear is that, sleep and usleep do not leave threads in a receptive state, using ::wait does.

In the real world, where you really should only ever wait for something, that would be a more complex block, it might (and should) look more like:

if ($this->delay) {
    $this->synchronized(function(){
        while ($this->condition) {
            $this->wait(1000000);
        }
    });
}

Note: waiting for a timeout is technically waiting for something, however, you might be awoken by something other than the timeout having been reached, and code should be prepared for that.

Such that, another context is able to notify the Thread that it should stop waiting and shutdown gracefully, or carry out some other important action immediately, simply by synchronizing, changing a condition and notifying the Thread.

For predictable code, it's extremely important to get comfortable with how synchronized, wait and notify work.

Next we have our logic for setting and or getting the reference:

$reference = 
    $this->referee->find($this->ident, $this);

Which invokes this:

public function find(string $ident, Threaded $reference) {
    return $this->synchronized(function () use($ident, $reference) {
        if (isset($this[$ident])) {
            return $this[$ident];
        } else return ($this[$ident] = $reference);
    });
}

This is badly named, naming things is hard, but you can see that integrity is preserved by synchronization while these grouped operations take place. The same method could also be used to fetch a reference to another object, with a bit of tweaking.

I guess you do something with that particular reference (which is always going to be $this currently). I can't guess what. Moving on ...

I've made the assumption that you'll want to do something with each of these Threads, and you want to preserve the integrity of the data while the entire iteration takes place:

$this->referee->foreach(function($ident, $reference){
    var_dump(Thread::getCurrentThreadId(),
            $reference->getIdent(), 
            $reference->isRunning());
});

Which invokes:

public function foreach(Closure $closure) {
    $this->synchronized(function() use($closure) {
        foreach ($this as $ident => $reference) {
            $closure($ident, $reference);
        }
    });
}

This is how you would do such a thing.

It's worthy of mention that synchronized is not necessarily required here; just as nothing bad will happen if you remove a member from an array you are iterating over, nothing bad will happen if you unset or set or do anything else to an object while iteration occurs.

Metacarpus answered 6/9, 2015 at 8:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.