PostgreSQL - implementing a reliable queue
Asked Answered
H

2

16

I am trying to implement a reliable queue with multiple writers and a multiple readers using postgres database. How to avoid missing rows when a queue reader scans a table then in-progress transactions commit after it reads.

We have a reader selecting rows in batches using a “checkpoint” time, where each batch gets the rows after the last timestamp in the previous batch, and we are missing rows. (Reason: The timestamp value is based on the time INSERT happens(00.00.00). At heavy loads, if the transaction takes longer time, it gets inserted let say 10 sec later(00.00.10), the reader will miss this row(row1) if it reads during that 10 seconds and finds a row which had its INSERT time at a later time(00.00.05) than row1. The complete description of the problem is similar to the one written in this blog. http://blog.thefourthparty.com/stopping-time-in-postgresql/)

Related prior question for context: Postgres LISTEN/NOTIFY - low latency, realtime?

Update: I had updated the question from having single reader to multiple readers. Order in which the reader reads does matter.

Hallie answered 5/10, 2015 at 10:54 Comment(6)
https://mcmap.net/q/371407/-job-queue-as-sql-table-with-multiple-consumers-postgresql/330315 and https://mcmap.net/q/751463/-queue-in-php-and-postgres/330315 might help. You might also want to look into this: pgxn.org/dist/pg_message_queueAlfeus
Is it crucial that all rows are processed in order? Then how is the order defined precisely? Or do you just want to avoid missing rows? Then a solution like presented here should work: Postgres Update, limit 1Debauch
Is it really required that you do this with postgresql? This is the sort of requirement that can be very easily fullfilledby redisTableland
@Tableland Yes it is. We are currently using postgresql and we are experiencing the issue I described in the question at higher loads.Hallie
@ErwinBrandstetter I believe they have to be in order of their timestamp. If the order gets messed up, we have missing rows. Also the the link you suggested, looks like involves rows getting locked which I am afraid would drastically affect the throughput.Hallie
I am not saying to chuck out postgresql all together (I swear by it myself) but this is one situation where a database isn't the practical solution. In our systems we use redis and postgresql together and lots of other people do so too.Tableland
T
7

Considering multiple readers, it is necessary to have control on which records each reader had received already.

Also, it's been said the order is a condition to send records to a reader as well. So, if some further transaction had committed before a earlier one, we have to "stop" and just send records again when it had committed, to maintain the order of records sent to the reader.

That said, check the implementation:

-- lets create our queue table 
drop table if exists queue_records cascade;
create table if not exists queue_records 
(
  cod serial primary key,
  date_posted timestamp default timeofday()::timestamp,
  message text
);


-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint 
(
  reader_id text primary key,
  last_checkpoint numeric
);



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
    vLAST_CHECKPOINT    numeric;
    vCHECKPOINT_EXISTS  integer;
    vRECORD         queue_records%rowtype;
BEGIN

    -- let's get the last record sent to the reader 
    SELECT  last_checkpoint
    INTO    vLAST_CHECKPOINT
    FROM    queue_reader_checkpoint
    WHERE   reader_id = pREADER_ID;

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on.
    if (vLAST_CHECKPOINT is null) then
        -- sets the flag indicating the reader does not have any checkpoint recorded
        vCHECKPOINT_EXISTS = 0;
        -- gets the very last commited record
        SELECT  MAX(cod)
        INTO    vLAST_CHECKPOINT
        FROM    queue_records;
    else
        -- sets the flag indicating the reader already have a checkpoint recorded
        vCHECKPOINT_EXISTS = 1; 
    end if;

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
            SELECT  *
            FROM    queue_records
            WHERE   COD > vLAST_CHECKPOINT 
            ORDER   BY COD
    LOOP

        -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
        if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then

            -- let's save the last record read
            vLAST_CHECKPOINT = vRECORD.COD;

            -- and return it
            RETURN NEXT vRECORD;

        -- but, if it is not, then is out of order
        else
            -- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
            -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
            exit;
        end if;
    END LOOP;


    -- now we have to persist the last record read to be retrieved on next call
    if (vCHECKPOINT_EXISTS = 0) then
        INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
    else        
        UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
    end if; 
end;
$BODY$ LANGUAGE plpgsql VOLATILE;
Tobitobiah answered 9/10, 2015 at 20:52 Comment(6)
I do need to have some "checkpoint". Are you suggesting I use "cod" for that? Also the order will be completely messed up.Hallie
I think in @Chandra's use case, there are multiple readers all reading the same table but maybe at different times and different speed. I'm not clear how cod will help his use case.Coupler
@ShintaSmith: I don't think so. The question clearly says: "...with multiple writers and a single reader...". And "cod" was introduced just to help to order correctly, as current_timestamp gets the time from the beginning of the transaction. But 'cod' isn't the base of the solution. It's the "unread" flag!Tobitobiah
@Chandra: Is order a condition to read further records? Or can we just get all committed records?Tobitobiah
@ChristianB.Almeida Sorry for the confusion. I was trying to keep the use case simple but that might have deviated the question a little. I do have mutiple readers and I believe having unread flag will not work in that scenario.Hallie
@ChristianB.Almeida In any case, order does matter to the reader. These are events that happen in certain order.Hallie
C
1

If it is an option for you to install an PostgreSQL extension (requires admin access), you can conisider to use the PGMQ opensource project: https://github.com/tembo-io/pgmq

Features (taken from the git-repo):

  • Lightweight - No background worker or external dependencies, just Postgres functions packaged in an extension
  • Guaranteed "exactly once" delivery of messages to a consumer within a visibility timeout API
  • parity with AWS SQS and RSMQ
  • Messages stay in the queue until explicitly removed
  • Messages can be archived, instead of deleted, for long-term retention and replayability

Still - if you are not running PostgreSQL by yourself (best as Docker container) this is most likely not an option.

Chuckle answered 25/3, 2024 at 10:13 Comment(2)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Canaanite
While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - From ReviewPyuria

© 2022 - 2025 — McMap. All rights reserved.