multithreading with mutexes in c and running one thread at a time
Asked Answered
O

2

0

I have an array of 100 requests(integers). I want to create 4 threads to which i call a function(thread_function) and with this function i want every thread to take one by one the requests:

(thread0->request0,

thread1->request1,

thread2->request2,

thread3->request3

and then thread0->request4 etc up to 100) all these by using mutexes. Here is the code i have writen so far:

threadRes = pthread_create(&(threadID[i]), NULL,thread_function, (void *)id_size);

This is inside my main and it is in a loop for 4 times.Now outside my main:

void *thread_function(void *arg){
int *val_p=(int *) arg;
for(i=0; i<200; i=i+2)     
{
    f=false;
        
    for (j= 0; j<100; j++)   
    {
        if (val_p[i]==cache[j].id)  
            f=true;
      
    }
    if(f==true) 
    {
        printf("The request  %d has been served.\n",val_p[i]); 

    }
    else
    {
        cache[k].id=val_p[i];
        printf("\nCurrent request to be served:%d \n",cache[k].id);
        k++;
    }
}

Where: val_p is the array with the requests and cache is an array of structs to store the id(requests).

-So now i want mutexes to synchronize my threads. I considered using inside my main:

pthread_join(threadID[0], NULL);
pthread_join(threadID[1], NULL);
pthread_join(threadID[2], NULL);
pthread_join(threadID[3], NULL);

pthread_mutex_destroy(&mutex);

and inside the function to use:

pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);

Before i finish i would like to say that so far my programm result is that 4threads run 100 requests each(400) and what i want to achieve is that 4threads run 100 threads total.

Thanks for your time.

Ocker answered 15/1, 2021 at 20:24 Comment(6)
Everything you've said sounds reasonable. Yes, main() should join each thread before it terminates. Yes, you should create a mutex and use it to protect access to any shared data that is modified by any thread after the first call to pthread_create. Yes, that involves calls to pthread_mutex_lock() and pthread_mutex_unlock(). But what is your actual question?Juryman
Thanks,what i dont know is where to put the lock and unlock. Should i put them inside main? inside function? or maybe both?Ocker
each thread must locks the mutex prior to any access, read or write, to any of the shared variables that the mutex protects. It must unlock the mutex at some later point to allow other threads access, and also before it itself tries to lock the mutex again. Within those broad parameters, there are many specific possibilities.Juryman
With that said, although using a mutex or similar synchronization object is necessary, it is not clear to me whether any particular pattern of mutex usage would be sufficient, by itself, to solve your problem. I do not follow your code and data structure well enough to perform such an analysis.Juryman
Thanks for your time. I will continue debugging with what you said!Ocker
mutex might not be enough. Try condition variables.Consolatory
G
1

You may want to post more of your code. How the arrays are set up, how the segment is passed to the individual threads, etc.

Note that using printf will perturb the timing of the threads. It does its own mutex for access to stdout, so it's probably better to no-op this. Or, have a set of per-thread logfiles so the printf calls don't block against one another.

Also, in your thread loop, once you set f to true, you can issue a break as there's no need to scan further.

val_p[i] is loop invariant, so we can fetch that just once at the start of the i loop.

We don't see k and cache, but you'd need to mutex wrap the code that sets these values.

But, that does not protect against races in the for loop. You'd have to wrap the fetch of cache[j].id in a mutex pair inside the loop. You might be okay without the mutex inside the loop on some arches that have good cache snooping (e.g. x86).


You might be better off using stdatomic.h primitives. Here's a version that illustrates that. It compiles but I've not tested it:

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>

int k;
#define true    1
#define false   0

struct cache {
    int id;
};

struct cache cache[100];

#ifdef DEBUG
#define dbgprt(_fmt...) \
    printf(_fmt)
#else
#define dbgprt(_fmt...) \
    do { } while (0)
#endif

void *
thread_function(void *arg)
{
    int *val_p = arg;
    int i;
    int j;
    int cval;
    int *cptr;

    for (i = 0; i < 200; i += 2) {
        int pval = val_p[i];
        int f = false;

        // decide if request has already been served
        for (j = 0; j < 100; j++) {
            cptr = &cache[j].id;
            cval = atomic_load(cptr);
            if (cval == pval) {
                f = true;
                break;
            }
        }

        if (f == true) {
            dbgprt("The request %d has been served.\n",pval);
            continue;
        }

        // increment the global k value [atomically]
        int kold = atomic_load(&k);
        int knew;
        while (1) {
            knew = kold + 1;
            if (atomic_compare_exchange_strong(&k,&kold,knew))
                break;
        }

        // get current cache value
        cptr = &cache[kold].id;
        int oldval = atomic_load(cptr);

        // mark the cache
        // this should never loop because we atomically got our slot with
        // the k value
        while (1) {
            if (atomic_compare_exchange_strong(cptr,&oldval,pval))
                break;
        }

        dbgprt("\nCurrent request to be served:%d\n",pval);
    }

    return (void *) 0;
}
Gallon answered 15/1, 2021 at 22:1 Comment(2)
I used your way and it helped a lot. i didnt know the existance of atomic!Ocker
For how to implement a mutex [ticket lock] with atomics, see my answer: #41915322 It also has a bunch more info that may be helpful.Gallon
M
2

You need to use a loop that looks like this:

  1. Acquire lock.
  2. See if there's any work to be done. If not, release the lock and terminate.
  3. Mark the work that we're going to do as not needing to be done anymore.
  4. Release the lock.
  5. Do the work.
  6. (If necessary) Acquire the lock. Mark the work done and/or report results. Release the lock.
  7. Go to step 1.

Notice how while holding the lock, the thread discovers what work it should do and then prevents any other thread from taking the same assignment before it releases the lock. Note also that the lock is not held while doing the work so that multiple threads can work concurrently.

Message answered 15/1, 2021 at 21:18 Comment(4)
So from your answer i can understand that i may have to use multiple times the lock/unlock commands. right?Ocker
@Ocker You certainly might. Though you can avoid it in this loop. You can avoid the unlock at the end of step 6 and jump to step 2 instead of step 1. But there's nothing wrong with needing to lock more than once.Message
Finally using more than once lock and unlock was the key to complete it. However the code below was usefull too.Ocker
The most important thing is that you figure out what work you need to do and prevent any other thread from doing that same work without releasing the lock.Message
G
1

You may want to post more of your code. How the arrays are set up, how the segment is passed to the individual threads, etc.

Note that using printf will perturb the timing of the threads. It does its own mutex for access to stdout, so it's probably better to no-op this. Or, have a set of per-thread logfiles so the printf calls don't block against one another.

Also, in your thread loop, once you set f to true, you can issue a break as there's no need to scan further.

val_p[i] is loop invariant, so we can fetch that just once at the start of the i loop.

We don't see k and cache, but you'd need to mutex wrap the code that sets these values.

But, that does not protect against races in the for loop. You'd have to wrap the fetch of cache[j].id in a mutex pair inside the loop. You might be okay without the mutex inside the loop on some arches that have good cache snooping (e.g. x86).


You might be better off using stdatomic.h primitives. Here's a version that illustrates that. It compiles but I've not tested it:

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>

int k;
#define true    1
#define false   0

struct cache {
    int id;
};

struct cache cache[100];

#ifdef DEBUG
#define dbgprt(_fmt...) \
    printf(_fmt)
#else
#define dbgprt(_fmt...) \
    do { } while (0)
#endif

void *
thread_function(void *arg)
{
    int *val_p = arg;
    int i;
    int j;
    int cval;
    int *cptr;

    for (i = 0; i < 200; i += 2) {
        int pval = val_p[i];
        int f = false;

        // decide if request has already been served
        for (j = 0; j < 100; j++) {
            cptr = &cache[j].id;
            cval = atomic_load(cptr);
            if (cval == pval) {
                f = true;
                break;
            }
        }

        if (f == true) {
            dbgprt("The request %d has been served.\n",pval);
            continue;
        }

        // increment the global k value [atomically]
        int kold = atomic_load(&k);
        int knew;
        while (1) {
            knew = kold + 1;
            if (atomic_compare_exchange_strong(&k,&kold,knew))
                break;
        }

        // get current cache value
        cptr = &cache[kold].id;
        int oldval = atomic_load(cptr);

        // mark the cache
        // this should never loop because we atomically got our slot with
        // the k value
        while (1) {
            if (atomic_compare_exchange_strong(cptr,&oldval,pval))
                break;
        }

        dbgprt("\nCurrent request to be served:%d\n",pval);
    }

    return (void *) 0;
}
Gallon answered 15/1, 2021 at 22:1 Comment(2)
I used your way and it helped a lot. i didnt know the existance of atomic!Ocker
For how to implement a mutex [ticket lock] with atomics, see my answer: #41915322 It also has a bunch more info that may be helpful.Gallon

© 2022 - 2024 — McMap. All rights reserved.