Ratchet PHP WAMP - React / ZeroMQ - Specific user broadcast
Asked Answered
M

2

18

Note: This is not the same as this question which utilises MessageComponentInterface. I am using WampServerInterface instead, so this question pertains to that part specifically. I need an answer with code examples and an explanation, as I can see this being helpful to others in the future.

Attempting looped pushes for individual users

I'm using the WAMP part of Ratchet and ZeroMQ, and I currently have a working version of the push integration tutorial.

I'm attempting to perform the following:

  • The zeromq server is up and running, ready to log subscribers and unsubscribers
  • A user connects in their browser over the websocket protocol
  • A loop is started which sends data to the specific user who requested it
  • When the user disconnects, the loop for that user's data is stopped

I have points (1) and (2) working, however the issue I have is with the third one:

Firstly: How can I send data to each specific user only? Broadcast sends it to everyone, unless maybe the 'topics' end up being individual user IDs maybe?

Secondly: I have a big security issue. If I'm sending which user ID wants to subscribe from the client-side, which it seems like I need to, then the user could just change the variable to another user's ID and their data is returned instead.

Thirdly: I'm having to run a separate php script containing the code for zeromq to start the actual looping. I'm not sure this is the best way to do this and I would rather having this working completely within the codebase as opposed to a separate php file. This is a major area I need sorted.

The following code shows what I currently have.

The server that just runs from console

I literally type php bin/push-server.php to run this. Subscriptions and un-subscriptions are output to this terminal for debugging purposes.

$loop   = React\EventLoop\Factory::create();
$pusher = Pusher;

$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onMessage'));

$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\WebSocket\WsServer(
        new Ratchet\Wamp\WampServer(
            $pusher
        )
    ),
    $webSock
);

$loop->run();

The Pusher that sends out data over websockets

I've omitted the useless stuff and concentrated on the onMessage() and onSubscribe() methods.

public function onSubscribe(ConnectionInterface $conn, $topic) 
{
    $subject = $topic->getId();
    $ip = $conn->remoteAddress;

    if (!array_key_exists($subject, $this->subscribedTopics)) 
    {
        $this->subscribedTopics[$subject] = $topic;
    }

    $this->clients[] = $conn->resourceId;

    echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);
}

public function onMessage($entry) {
    $entryData = json_decode($entry, true);

    var_dump($entryData);

    if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {
        return;
    }

    $topic = $this->subscribedTopics[$entryData['topic']];

    // This sends out everything to multiple users, not what I want!!
    // I can't send() to individual connections from here I don't think :S
    $topic->broadcast($entryData);
}

The script to start using the above Pusher code in a loop

This is my issue - this is a separate php file that hopefully may be integrated into other code in the future, but currently I'm not sure how to use this properly. Do I grab the user's ID from the session? I still need to send it from client-side...

// Thought sessions might work here but they don't work for subscription
session_start();
$userId = $_SESSION['userId'];

$loop   = React\EventLoop\Factory::create();

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");

$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {

   $entryData = array(
       'topic'     => 'subscriptionTopicHere',
       'userId'    => $userId
    );
    $i++;

    // So it doesn't go on infinitely if run from browser
    if ($i >= 3)
    {
        $loop->stop();
    }

    // Send stuff to the queue
    $socket->send(json_encode($entryData));
});

Finally, the client-side js to subscribe with

$(document).ready(function() { 

    var conn = new ab.Session(
        'ws://localhost:8080' 
      , function() {            
            conn.subscribe('topicHere', function(topic, data) {
                console.log(topic);
                console.log(data);
            });
        }
      , function() {          
            console.warn('WebSocket connection closed');
        }
      , {                       
            'skipSubprotocolCheck': true
        }
    );
});

Conclusion

The above is working, but I really need to figure out the following:

  • How can I send individual messages to individual users? When they visit the page that starts the websocket connection in JS, should I also be starting the script that shoves stuff into the queue in PHP (the zeromq)? That's what I'm currently doing manually, and it just feels wrong.

  • When subscribing a user from JS, it can't be safe to grab the users id from the session and send that from client-side. This could be faked. Please tell me there is an easier way, and if so, how?

Madigan answered 28/9, 2013 at 10:16 Comment(0)
M
22

Note: My answer here does not include references to ZeroMQ, as I am not using it any more. However, I'm sure you will be able to figure out how to use ZeroMQ with this answer if you need to.

Use JSON

First and foremost, the Websocket RFC and WAMP Spec state that the topic to subscribe to must be a string. I'm cheating a little here, but I'm still adhering to the spec: I'm passing JSON through instead.

{
    "topic": "subject here",
    "userId": "1",
    "token": "dsah9273bui3f92h3r83f82h3"
}

JSON is still a string, but it allows me to pass through more data in place of the "topic", and it's simple for PHP to do a json_decode() on the other end. Of course, you should validate that you actually receive JSON, but that's up to your implementation.

So what am I passing through here, and why?

  • Topic

The topic is the subject the user is subscribing to. You use this to decide what data you pass back to the user.

  • UserId

Obviously the ID of the user. You must verify that this user exists and is allowed to subscribe, using the next part:

  • Token

This should be a one use randomly generated token, generated in your PHP, and passed to a JavaScript variable. When I say "one use", I mean every time you reload the page (and, by extension, on every HTTP request), your JavaScript variable should have a new token in there. This token should be stored in the database against the User's ID.

Then, once a websocket request is made, you match the token and user id to those in the database to make sure the user is indeed who they say they are, and they haven't been messing around with the JS variables.

Note: In your event handler, you can use $conn->remoteAddress to get the IP of the connection, so if someone is trying to connect maliciously, you can block them (log them or something).

Why does this work?

It works because every time a new connection comes through, the unique token ensures that no user will have access to anyone else's subscription data.

The Server

Here's what I am using for running the loop and event handler. I am creating the loop, doing all the decorator style object creation, and passing in my EventHandler (which I'll come to soon) with the loop in there too.

$loop = Factory::create();

new IoServer(
    new WsServer(
        new WampServer(
            new EventHandler($loop) // This is my class. Pass in the loop!
        )
    ),
    $webSock
);

$loop->run();

The Event Handler

class EventHandler implements WampServerInterface, MessageComponentInterface
{
    /**
     * @var \React\EventLoop\LoopInterface
     */
    private $loop;

    /**
     * @var array List of connected clients
     */
    private $clients;

    /**
     * Pass in the react event loop here
     */
    public function __construct(LoopInterface $loop)
    {
        $this->loop = $loop;
    }

    /**
     * A user connects, we store the connection by the unique resource id
     */
    public function onOpen(ConnectionInterface $conn)
    {
        $this->clients[$conn->resourceId]['conn'] = $conn;
    }

    /**
     * A user subscribes. The JSON is in $subscription->getId()
     */
    public function onSubscribe(ConnectionInterface $conn, $subscription)
    {
        // This is the JSON passed in from your JavaScript
        // Obviously you need to validate it's JSON and expected data etc...
        $data = json_decode(subscription->getId());
        
        // Validate the users id and token together against the db values
        
        // Now, let's subscribe this user only
        // 5 = the interval, in seconds
        $timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) {
            $data = "whatever data you want to broadcast";
            return $subscription->broadcast(json_encode($data));
        });

        // Store the timer against that user's connection resource Id
        $this->clients[$conn->resourceId]['timer'] = $timer;
    }

    public function onClose(ConnectionInterface $conn)
    {
        // There might be a connection without a timer
        // So make sure there is one before trying to cancel it!
        if (isset($this->clients[$conn->resourceId]['timer']))
        {
            if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface)
            {
                $this->loop->cancelTimer($this->clients[$conn->resourceId]['timer']);
            }
        }
    
        unset($this->clients[$conn->resourceId]);
    }

    /** Implement all the extra methods the interfaces say that you must use **/
}

That's basically it. The main points here are:

  • Unique token, userid and connection id provide the unique combination required to ensure that one user can't see another user's data.
  • Unique token means that if the same user opens another page and requests to subscribe, they'll have their own connection id + token combo so the same user won't have double the subscriptions on the same page (basically, each connection has it's own individual data).

Extension

You should be ensuring all data is validated and not a hack attempt before you do anything with it. Log all connection attempts using something like Monolog, and set up e-mail forwarding if any critical's occur (like the server stops working because someone is being a bastard and attempting to hack your server).

Closing Points

  • Validate Everything. I can't stress this enough. Your unique token that changes on every request is important.
  • Remember, if you re-generate the token on every HTTP request, and you make a POST request before attempting to connect via websockets, you'll have to pass back the re-generated token to your JavaScript before trying to connect (otherwise your token will be invalid).
  • Log everything. Keep a record of everyone that connects, asks for what topic, and disconnects. Monolog is great for this.
Madigan answered 17/1, 2014 at 16:2 Comment(18)
Late to the party, but this answer really helped me out. Thanks.Callicrates
Using userId in the json is a very bad idea , because you can change the value and subscribe to other people channels . You can instead register a user with his session and append it to an array and then send to that specific userClassify
@Sekai But anything changing client-side won't work, because there will be a mis-match server-side between the userid and the unique token that changes on every request. Server-side, you're checking the unique "one-time" request token against the user id. Did you miss that or did I forget to include it?Madigan
It's ok if you create the token in the server side, but anyways , there is easier way to do it, please read here socketo.me/docs/sessionsClassify
@Sekai Yep, the token is automatically re-generated on each request. The Sessions part is coupled to Symfony2 components unfortunately.Madigan
Pardon me I assumed you are using Symfony , your solution is the best then +1Classify
If you want to add another answer here showing how to use the Sessions part I'll happily upvote you as it's likely to be useful to future users, and maybe me too :-)Madigan
Why are you using a timer to push data to clients? Wouldn't it be a lot more efficient to use something like zeromq to let your script know when to push updates?Parisi
@Parisi How would an application using ZeroMQ know to push the updates? It's still going to have to monitor the resource which, in this case, is a database table (it could be requests over HTTP, for example). Sure, websockets allow bidirectional communication, but not all the resources being read from allow asynchronous communication. There's still going to have to be polling logic to talk to MySQL, or another service. ZeroMQ just does the IPC, doesn't it?Madigan
@Madigan I think the whole premise of zeromq is to avoid having to use polling. I'm still trying to figure out how to use ratchet & zmq myself, but from the examples I've seen the ratchet server script listens on a socket using zmq and then whenever something happens in another script that requires data be pushed to a client you simply use zmq to let the ratchet server know to push. So it completely avoids polling.Parisi
For example, let's say you want to update a comments thread like on here or on facebook. When a user posts a new comment, after adding the comment to the database you'd make a zmq request to let the ratchet server know it needs to push an update to all subscribed clients.Parisi
@Parisi This is useful because there are user controlled events occurring that can be caught by the script and forwarded over ZMQ. So chat is great for this. OnPost -> Update ZeroMQ for everyone. The problem comes where there's a resource out of your control that has no 'hook' for you. For example, I was forwarding torrent data over websockets to the client. The only way to get this data was to, server-side, parse the result of a CLI call for json. True, lose the timer and you can just forward when this differs from the previous call, but this only eliminates the HTTP issue, not resource pollingMadigan
@Madigan I'm struggling with a problem when connecting to my socketserver my authbhan client is throwing error failed: Error in connection establishment: net::ERR_CONNECTION_TIMED_OUTSchiedam
@Madigan Please check this : #31540909Schiedam
You could also do what php does by default and use the cookie ; PHPSESSID. So the first time the page loads you start_sessions() like normal and from then on you can use that cookie to uniquely identify the user. you would also scrap sending the ids across the network and only send this PHPSESSID cookie. If you want to make sure the user is authenticated you could do as ur solution and insert the SESSID as a token beside their ID in a DB or you could even use php's own way of doing it by using a kind of key/value file store. (php xamp stores them in the temp folder). cache maybe an option too.Calder
Thanks for the input Alex - definitely want authentication there so the difference would be storing the session ID in the db which is feasible, just depends on your chosen implementation.Madigan
I've been trying to use this method and it seems Ratchet now expects a URI (example.com/simple) or message with CURIE (event:myevent1). The string gets parsed with getUri(), and if it's json gets corrupted before it gets passed to onSubscribe()Lida
Wow, they've ruined BC then? Perhaps urlencoded GET parameters for the URL might work if they can be accessed on the other side then? It's almost a guarantee cboden has seen this post and chosen not to suggest a better way.Madigan
S
2

To send to specific users, you need a ROUTER-DEALER pattern instead of PUB-SUB. This is explained in the Guide, in chapter 3. Security, if you're using ZMQ v4.0, is handled at the wire level, so you don't see it in the application. It still requires some work, unless you use the CZMQ binding, which provides an authentication framework (zauth).

Basically, to authenticate, you install a handler on inproc://zeromq.zap.01, and respond to requests over that socket. Google ZeroMQ ZAP for the RFC; there is also a test case in the core libzmq/tests/test_security_curve.cpp program.

Selfsustaining answered 29/9, 2013 at 17:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.