Swoole with RabbitMQ
Asked Answered
I

1

9

I'm trying to send some data from php application to the user's browser using websockets. Therefore I've decided to use Swoole in combination with RabbitMQ.

It's the first time I'm working with websockets and after reading some posts about Socket.IO, Ratchet, etc. I've decided to halt on Swoole because it's written in C and handy to use with php.

This is how I understood the idea of enabling data transfer using websockets: 1) Start RabbitMQ worker and Swoole server in CLI 2) php application sends data to RabbitMQ 3) RabbitMQ sends message with data to worker 4) Worker receives message with data + establishes socket connection with Swoole socket server. 5) Swoole server broadcasts data to all connections

The question is how to bind Swoole socket server with RabbitMQ? Or how to make RabbitMQ to establish connection with Swoole and send data to it?

Here is the code:

Swoole server (swoole_sever.php)

$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
    echo "connection close: {$fd}\n";
});

$server->start();

Worker which receives message from RabbitMQ, then makes connection to Swoole and broadcasts the message via socket connection (worker.php)

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


    // Here I'm trying to make connection to Swoole server and sernd data
    $cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

New task where the message will be send to RabbitMQ (new_task.php):

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

After starting both swoole server and worker I'm triggering new_task.php from command line:

php new_task.php

In command line prompt where a RabbitMQ Worker is running (worker.php) I can see that a message is delivered to the worker ("[x] Received Hello World!" message is appearing).

However in command line prompt where Swoole server is running happens nothing.

So the questions are: 1) Is the idea of this approach right? 2) What am I doing wrong?

Idolatry answered 12/3, 2018 at 0:42 Comment(7)
I never used this swoole but I have used rabbit, just a cursory look a their documentation and it seems like you are missing a few things github.com/swoole/swoole-src/blob/master/examples/… such as $cli->setData(...) and $cli->execute(...)Deformation
I would suggest breaking things down, for example write a command line php script to test sending a simple message to this swoole then once you get that working integrate the RabbitMq stuff, that way you can isolate the issues.Deformation
I've already done that. I can have a socket connection using Swoole and send data in two ways. I can also send message to Worker using RabbitMQ. So, the only problem I have is using RabbitMQ with Swoole together.Idolatry
Make sure you worker has the proper permissions then, that's the only thing I can think of. Perhaps as root you can connect to swoole but the worker cannot. There should be no difference if the code works separate so it must be environment.Deformation
Strange thing I've noticed. The steps I'm doing: 1) Start Swoole server 2) Start Worker 3) execute new_task.php 4) Got message in worker.php 5) Nothing to see in swoole_server.php 6) Manually stop worker.php running in terminal 7) in terminal window of swoole_server.php I get message: "connection close: 1".Idolatry
So it seems like connection is being established but no 'message' event is triggered in $cli->on('message', function ($_cli, $frame)Idolatry
@ArtisticPhoenix, the worker is connected with username "guest". I've checked the permissions by command "sudo rabbitmqctl list_user_permissions guest". And the result is: / .* .* .*Idolatry
H
1

In the callback(in worker.php) that fires when a message is received you're using swoole_http_client which is async only. This seems to results in the code never being fully executed as the callback function returns before the async code is triggered.

A synchronous method of doing the same thing will solve the problem. Here is a simple example:

$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();

Check out the WebSocketClient class and example usage at github.

You can also wrap it in a coroutine, like this:

go(function () {
    $client = new WebSocketClient('0.0.0.0', 2345);
    $client->connect();
    $client->send('This is the message to send to Swoole server');
    $recv = $client->recv();
    print_r($recv);
    $client->close();
});
Heinrik answered 21/5, 2018 at 19:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.