The best way to use a DB table as a job queue (a.k.a batch queue or message queue)
Asked Answered
Z

6

26

I have a databases table with ~50K rows in it, each row represents a job that need to be done. I have a program that extracts a job from the DB, does the job and puts the result back in the db. (this system is running right now)

Now I want to allow more than one processing task to do jobs but be sure that no task is done twice (as a performance concern not that this will cause other problems). Because the access is by way of a stored procedure, my current though is to replace said stored procedure with something that looks something like this

update tbl 
set owner = connection_id() 
where available and owner is null limit 1;

select stuff 
from tbl 
where owner = connection_id();

BTW; worker's tasks might drop there connection between getting a job and submitting the results. Also, I don't expect the DB to even come close to being the bottle neck unless I mess that part up (~5 jobs per minute)

Are there any issues with this? Is there a better way to do this?

Note: the "Database as an IPC anti-pattern" is only slightly apropos here because

  1. I'm not doing IPC (there is no process generating the rows, they all already exist right now) and
  2. the primary gripe described for that anti-pattern is that it results in unneeded load on the DB as processes wait for messages (in my case, if there are no messages, everything can shutdown as everything is done)
Zita answered 17/11, 2008 at 23:0 Comment(3)
Right - bad = synchronous IPC with blocking on a dbms SELECT as a read. You're presumably doing this as a strategy for introducing asynchronicity.Chromatogram
BTW, if you want to put the reader(s) on a timer, it's useful to have them check infrequently, but if they find work, they can drain the queue before sleeping again.Chromatogram
Note my edit: if they find no work, they will never find work. But if that were not true...Zita
C
14

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN
Chromatogram answered 17/11, 2008 at 23:31 Comment(6)
The part described as "The reader pulls one message and changes the state to "A"ctive while it works on it." is the part I'm interested in. How do you do that bit? (aside from that, it looks like mine is the same as yours with out the stuff that isn't needed for my case.)Zita
Right, that requires multiple SQL statements between BEGIN TRAN and COMMIT TRAN. Immediately following - an SP for pulling the next message - hacked up a bit, I've omitted error trapping since it was written pre-TRY/CATCH.Chromatogram
-- PART 1 CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) AS DECLARE @MsgId INT BEGIN TRAN SELECT TOP 1 @MsgId = MsgId FROM MsgQueue WHERE MessageType = @pMessageType AND State = 'N' ORDER BY CreateTimeChromatogram
PART 2 IF @MsgId IS NOT NULL BEGIN UPDATE MsgQueue SET State = 'A' WHERE MsgId = @MsgId SELECT MsgId, Msg FROM MsgQueue WHERE MsgId = @MsgId END ELSE BEGIN SELECT MsgId = NULL, Msg = NULL END COMMIT TRANChromatogram
what if i have to select more than(multiple) one row(s) at a time? can i update all at same time?Lorianne
Assuming you mark them with a common timestamp, or selection-batch id, you can update them all in a single statement, yes. Or use the "A" state described above, and update where state = 'A'.Chromatogram
C
42

The best way to implement a job queue in a relational database system is to use SKIP LOCKED.

SKIP LOCKED is a lock acquisition option that applies to both read/share (FOR SHARE) or write/exclusive (FOR UPDATE) locks and is widely supported nowadays:

  • Oracle 10g and later
  • PostgreSQL 9.5 and later
  • SQL Server 2005 and later
  • MySQL 8.0 and later

Now, consider we have the following post table:

post table

The status column is used as an Enum, having the values of:

  • PENDING (0),
  • APPROVED (1),
  • SPAM (2).

If we have multiple concurrent users trying to moderate the post records, we need a way to coordinate their efforts to avoid having two moderators review the same post row.

So, SKIP LOCKED is exactly what we need. If two concurrent users, Alice and Bob, execute the following SELECT queries which lock the post records exclusively while also adding the SKIP LOCKED option:

[Alice]:
SELECT
    p.id AS id1_0_,1
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
 
[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

We can see that Alice can select the first two entries while Bob selects the next 2 records. Without SKIP LOCKED, Bob lock acquisition request would block until Alice releases the lock on the first 2 records.

Crosseye answered 16/4, 2019 at 7:4 Comment(0)
C
14

Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN
Chromatogram answered 17/11, 2008 at 23:31 Comment(6)
The part described as "The reader pulls one message and changes the state to "A"ctive while it works on it." is the part I'm interested in. How do you do that bit? (aside from that, it looks like mine is the same as yours with out the stuff that isn't needed for my case.)Zita
Right, that requires multiple SQL statements between BEGIN TRAN and COMMIT TRAN. Immediately following - an SP for pulling the next message - hacked up a bit, I've omitted error trapping since it was written pre-TRY/CATCH.Chromatogram
-- PART 1 CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) AS DECLARE @MsgId INT BEGIN TRAN SELECT TOP 1 @MsgId = MsgId FROM MsgQueue WHERE MessageType = @pMessageType AND State = 'N' ORDER BY CreateTimeChromatogram
PART 2 IF @MsgId IS NOT NULL BEGIN UPDATE MsgQueue SET State = 'A' WHERE MsgId = @MsgId SELECT MsgId, Msg FROM MsgQueue WHERE MsgId = @MsgId END ELSE BEGIN SELECT MsgId = NULL, Msg = NULL END COMMIT TRANChromatogram
what if i have to select more than(multiple) one row(s) at a time? can i update all at same time?Lorianne
Assuming you mark them with a common timestamp, or selection-batch id, you can update them all in a single statement, yes. Or use the "A" state described above, and update where state = 'A'.Chromatogram
A
0

Instead of having owner = null when it isn't owned, you should set it to a fake nobody record instead. Searching for null doesn't limit the index, you might end up with a table scan. (this is for oracle, SQL server might be different)

Atrioventricular answered 17/11, 2008 at 23:13 Comment(0)
A
0

Just as a possible technology change, you might consider using MSMQ or something similar.

Each of your jobs / threads could query the messaging queue to see if a new job was available. Because the act of reading a message removes it from the stack, you are ensured that only one job / thread would get the message.

Of course, this is assuming you are working with a Microsoft platform.

Ampersand answered 17/11, 2008 at 23:55 Comment(1)
I have the data in the DB, when I'm done I need the data in the DB. In my case I see no reason to add another component to the system. (BTW microsoft.com/windowsserver2003/technologies/msmq/default.mspx)Zita
T
0

See Vlad's answer for context, I'm just adding the equivalent in Oracle because there's a few "gotchas" to be aware of.

The

SELECT * FROM t order by x limit 2 FOR UPDATE OF t SKIP LOCKED

will not translate directly to Oracle in the way you might expect. If we look at a few options of translation, we might try any of the following:

SQL> create table t as
  2   select rownum x
  3   from dual
  4   connect by level <= 100;

Table created.

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x for update skip locked fetch first 2 rows only;
  5  end;
  6  /
  open rc for select * from t order by x for update skip locked fetch first 2 rows only;
                                                                *
ERROR at line 4:
ORA-06550: line 4, column 65:
PL/SQL: ORA-00933: SQL command not properly ended
ORA-06550: line 4, column 15:
PL/SQL: SQL Statement ignored

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x fetch first 2 rows only for update skip locked ;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

or perhaps try falling back to the ROWNUM option

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from ( select * from t order by x ) where rownum <= 10 for update skip locked;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

And you won't get any joy. You thus need to control the fetching of the "n" rows yourself. Thus you can code up something like:

SQL> declare
  2    rc sys_refcursor;
  3    res1 sys.odcinumberlist := sys.odcinumberlist();
  4  begin
  5    open rc for select * from t order by x for update skip locked;
  6    fetch rc bulk collect into res1 limit 10;
  7  end;
  8  /

PL/SQL procedure successfully completed.
Therron answered 26/10, 2020 at 9:23 Comment(0)
L
-6

You are trying to implement de "Database as IPC" antipattern. Look it up to understand why you should consider redesigning your software properly.

Libb answered 17/11, 2008 at 23:6 Comment(5)
How do you know it's an antipattern in this case, or that the software design is improper? You don't have any context on which to base this comment whatsoever.Subfusc
I'd called it a useful pattern for asynchronous IPC. You can configure it to operate like any garden-variety message queue, and they aren't in my experience branded "antipatterns".Chromatogram
Here's a reference to the antipattern - tripatlas.com/Database_as_an_IPC The difference is that we're discussing using the database as a message queue, not as a mechanism for processes to interoperate.Chromatogram
Using a database as a message queue is an anti-pattern. You're going to get lock contention up the ying-yang, and if you're using an MVCC system with multiple workers you're going to end up with nebulous state for any record. You should use a message queue broker like RabbitMQ.Chery
@Chery not really because using a message broker means that you need to handle distributed transactions - thats why people use outbox pattern, thus databaseInexplicable

© 2022 - 2024 — McMap. All rights reserved.