We actually do this in many projects, even WordPress CLI tools, and we use this library to do it along with this for the transport. It doesn't require Symfony and can work with most queue systems that follow the general standard.
The general idea is that you want something (probably a singleton) to return an instance of Interop\Queue\Context
, and here's what we use:
function createContext(): \Interop\Queue\Context
{
$factory = new \Enqueue\Dbal\DbalConnectionFactory(
sprintf(
'mysql://%1$s:%2$s@%3$s/%4$s',
DB_USER,
DB_PASSWORD,
DB_HOST,
DB_NAME
)
);
$context = $factory->createContext();
$context->createDataBaseTable();
return $context;
}
You'll also want something to handle each message, and you'll want to pass the message and consumer to it:
function handleMessage($message, $consumer)
{
// Business logic here
if($business_logic_failed) {
$context = createContext();
$failed_queue = $context->createQueue('FAILED_QUEUE_HERE');
$context->createProducer()->send($failed_queue, $message);
} else {
$consumer->acknowledge($message);
}
}
Then to use it:
$context = createContext();
$queue = $context->createQueue('QUEUE_NAME_HERE');
$consumer = $context->createConsumer($queue);
// This can be an infinite loop, or a loop for 10 messages and exit, whatever your logic
while(true) {
// This command will block unless you pass a timeout, so no sleep is needed
$message = $consumer->receive(/* optional timeout here */);
handleMessage($message, $consumer);
// Do whatever you want with message
}
Sprinkle a lot of try/catch around that, too, and make sure that no matter what you acknowledge or fail the message in some way.