How do I use mqueue in a c program on a Linux based system?
Asked Answered
D

4

49

How do I use mqueue (message queue) in a c program on a Linux based system?

I'm looking for some good code examples that can show how this is done in a correct and proper way, maybe a howto.

Dogfight answered 16/6, 2010 at 18:57 Comment(0)
S
83

The following is a simple example of a server that receives messages from clients until it receives an "exit" message telling it to stop.

The code for the server:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <mqueue.h>

#include "common.h"

int main(int argc, char **argv)
{
    mqd_t mq;
    struct mq_attr attr;
    char buffer[MAX_SIZE + 1];
    int must_stop = 0;

    /* initialize the queue attributes */
    attr.mq_flags = 0;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = MAX_SIZE;
    attr.mq_curmsgs = 0;

    /* create the message queue */
    mq = mq_open(QUEUE_NAME, O_CREAT | O_RDONLY, 0644, &attr);
    CHECK((mqd_t)-1 != mq);

    do {
        ssize_t bytes_read;

        /* receive the message */
        bytes_read = mq_receive(mq, buffer, MAX_SIZE, NULL);
        CHECK(bytes_read >= 0);

        buffer[bytes_read] = '\0';
        if (! strncmp(buffer, MSG_STOP, strlen(MSG_STOP)))
        {
            must_stop = 1;
        }
        else
        {
            printf("Received: %s\n", buffer);
        }
    } while (!must_stop);

    /* cleanup */
    CHECK((mqd_t)-1 != mq_close(mq));
    CHECK((mqd_t)-1 != mq_unlink(QUEUE_NAME));

    return 0;
}

The code for the client:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <mqueue.h>

#include "common.h"


int main(int argc, char **argv)
{
    mqd_t mq;
    char buffer[MAX_SIZE];

    /* open the mail queue */
    mq = mq_open(QUEUE_NAME, O_WRONLY);
    CHECK((mqd_t)-1 != mq);


    printf("Send to server (enter \"exit\" to stop it):\n");

    do {
        printf("> ");
        fflush(stdout);

        memset(buffer, 0, MAX_SIZE);
        fgets(buffer, MAX_SIZE, stdin);

        /* send the message */
        CHECK(0 <= mq_send(mq, buffer, MAX_SIZE, 0));

    } while (strncmp(buffer, MSG_STOP, strlen(MSG_STOP)));

    /* cleanup */
    CHECK((mqd_t)-1 != mq_close(mq));

    return 0;
}

The common header:

#ifndef COMMON_H_
#define COMMON_H_

#define QUEUE_NAME  "/test_queue"
#define MAX_SIZE    1024
#define MSG_STOP    "exit"

#define CHECK(x) \
    do { \
        if (!(x)) { \
            fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
            perror(#x); \
            exit(-1); \
        } \
    } while (0) \


#endif /* #ifndef COMMON_H_ */

Compiling:

gcc -o server server.c -lrt
gcc -o client client.c -lrt
Shamus answered 16/6, 2010 at 20:39 Comment(7)
One short remark. Your code for the client is missing the following includes to make it compile:#include <stdio.h> #include <string.h> #include <stdlib.h>Soelch
Sweet, I'm loving your CHECK macro.Per
I'm sure I'm not understanding something right, but aren't message queues supposed to be asynchronous? Why does the client barf an error and exits if the server isn't available? As far as my (probably wrong) understanding goes, the whole point of message queues is to allow clients to write to unattended queues -- or else, what's the real difference between mqueues and FIFO? What am I misunderstanding here? Have you noticed I'm asking a lot of questions?Ideomotor
@Gutza I'd just replace the words client/server by producer/consumer in this case. The queue is always available through the API, the OS will keep it safe until someone consumes that data.Higher
@clarete, well, I was using the_void's terminology; also, while your assertion is correct in the general case, the_void's code doesn't allow the client/producer to write to an unattended queue (even though the library would allow it). The answer I ended up with after further consideration was that for some reason the_void "needed" this to be the case in this particular implementation: s/he could've chosen to push data to the queue regardless of whether there was a consumer active on the other end or not, but s/he simply chose not to.Ideomotor
What is '0644' in mq_opening line?Loadstar
on some systems you need to #include <fcntl.h> to use O_WRONLY and other definitionsNewcomen
C
1
#include <stdio.h>
#include <fcntl.h>
#include <mqueue.h>

int main(int argc, char *argv[])
{
    mqd_t mq;               // message queue
    struct mq_attr ma;      // message queue attributes
    int status = 0;
    int a = 5;
    int b = 0;

    printf("a = %d, b = %d\n", a, b);

    // Specify message queue attributes.
    ma.mq_flags = 0;                // blocking read/write
    ma.mq_maxmsg = 16;              // maximum number of messages allowed in queue
    ma.mq_msgsize = sizeof(int);    // messages are contents of an int
    ma.mq_curmsgs = 0;              // number of messages currently in queue

    // Create the message queue with some default settings.
    mq = mq_open("/test_queue", O_RDWR | O_CREAT, 0700, &ma);

    // -1 indicates an error.
    if (mq == -1)
    {
        printf("Failed to create queue.\n");
        status = 1;
    }

    if (status == 0)
    {
        status = mq_send(mq, (char *)(&a), sizeof(int), 1);
    }

    if (status == 0)
    {
        status = mq_receive(mq, (char *)(&b), sizeof(int), NULL);
    }

    if ((status == 0) && (mq_close(mq) == -1))
    {
        printf("Error closing message queue.\n");
        status = 1;
    }

    if ((status == 0) && (mq_unlink("test_queue") == -1))
    {
        printf("Error deleting message queue.\n");
        status = 1;
    }

    printf("a = %d, b = %d\n", a, b);

    return status;
} 
Calabro answered 16/6, 2010 at 19:27 Comment(6)
There is something very wrong with your implementation. It is a terrible idea to pass pointers through mqueues, since a pointer is only valid in its own process, while mqueues are intended to be used between processes. But in the end you are passing ints. It may work only because sizeof(void*) > sizeof(int) on most architectures.Onwards
@Juliano: Thanks, I was using sizeof(void *) where it should have been sizeof(int). This is just a synthetic example to show usage of mqueue. It demonstrates the contents of one integer moving through the queue into another integer where both are treated as buffers.Calabro
@Armardeep: sizeof(a) and sizeof(b) would be better than sizeof(int).Syringa
@camh: Agreed. I would also argue that an even better approach (which I would use in a production design) would be to define a message type and its size. Anything to be transported would have controlled methods to load/store the buffers and enforce the validity of the message once it went across.Calabro
the mq_open will fail, because the name doesn't start with /, so it should be "/test_queue"Wolframite
Thanks, it has been corrected. It works without the / in Cygwin. Linux and other real posix implementations require the prefix.Calabro
S
0

mq_send(mq, (char *)(&a), sizeof(int), 1) copies sizeof(int) bytes from buffer &a in this case, it does not carry the pointer of variable a, but carries the value of variable a from one process to another process. Implementation is right.

Shalna answered 13/6, 2013 at 13:49 Comment(0)
M
0

Code as below for your reference:

IPC_msgq_rcv.c

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#define MAXSIZE     128

void die(char *s)
{
  perror(s);
  exit(1);
}

struct msgbuf
{
    long    mtype;
    char    mtext[MAXSIZE];
};


void main()
{
    int msqid;
    key_t key;
    struct msgbuf rcvbuffer;

    key = 1234;

    if ((msqid = msgget(key, 0666)) < 0)
      die("msgget()");


     //Receive an answer of message type 1.
    if (msgrcv(msqid, &rcvbuffer, MAXSIZE, 1, 0) < 0)
      die("msgrcv");

    printf("%s\n", rcvbuffer.mtext);
    exit(0);
}

IPC_msgq_send.c

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#define MAXSIZE     128

void die(char *s)
{
  perror(s);
  exit(1);
}

struct msgbuf
{
    long    mtype;
    char    mtext[MAXSIZE];
};

main()
{
    int msqid;
    int msgflg = IPC_CREAT | 0666;
    key_t key;
    struct msgbuf sbuf;
    size_t buflen;

    key = 1234;

    if ((msqid = msgget(key, msgflg )) < 0)   //Get the message queue ID for the given key
      die("msgget");

    //Message Type
    sbuf.mtype = 1;

    printf("Enter a message to add to message queue : ");
    scanf("%[^\n]",sbuf.mtext);
    getchar();

    buflen = strlen(sbuf.mtext) + 1 ;

    if (msgsnd(msqid, &sbuf, buflen, IPC_NOWAIT) < 0)
    {
        printf ("%d, %ld, %s, %d \n", msqid, sbuf.mtype, sbuf.mtext, (int)buflen);
        die("msgsnd");
    }

    else
        printf("Message Sent\n");

    exit(0);
}

Compile each of the source files, to get a writer-executable and reader-executable. As below::

gcc -o MQsender IPC_msgq_send.c

gcc -o MQreceiver IPC_msgq_rcv.c

Executing each of the binaries, you can send the message and read the message from the message queue. Also, try to see the message queue state, by running command (at different states of queue):

ipcs -q

For your linux system, you can know all the details of the IPC mechanisms and available queues etc, by using:

ipcs -a

Reference Blog

Moonshiner answered 8/9, 2015 at 13:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.