I need Mutex in Dart
Asked Answered
H

3

15

I have an application that use a lot of socketio request and I make bulk request. When receiving the data, I have to iterate through a list to add and remove item:

List carlist;

void receiveData(Map data) {
  // Need a lock here
  for (var i = 0; i < carlist.lenght;) {
    if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
      carlist.removeAt(i);
      onDeleteCar(data); // Update the UI.
    }
  }
  // Need a release lock here
}

Because I need to modified the list, I can't work without mutex in here.

Anyone have an idea?

Hake answered 31/7, 2014 at 19:16 Comment(1)
In answer to myself and following a discussion with few Dart guru, all ajax call are all processed in a single thread. This mean that at no time, two pieces of that code can run at the same time even if it come from an system event. Webworker can run at the same time which was making me thinking that chrome went multithread, but they don't share variables. You need to use ReceivePort and SendPort class to communicate with them. So, the code above will not need lock in Javascript/Dart, but any other language, it will.Hake
P
19

Dart has no concurrent threads so a simple boolean variable can work as a mutex.
If you need another async execution to wait until the lock is released and continue then instead of just return, it becomes a bit more complicated.

bool mutex = false;

void receiveData(Map data) {
  // Need a lock here
  if(mutex) {
    return;
  } else {
    mutex = true;
  }

  for (var i = 0; i < carlist.lenght;) {
    if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
      carlist.removeAt(i);
      onDeleteCar(data); // Update the UI.
    }
  }
  // Need a release lock here
  mutex = false;
}

If your method is called from an event handler that produces a lot of events very fast and you want to process each execution but still only one by one you can queue the calls like:

import 'dart:async' as async;
import 'dart:collection' as coll;
import 'dart:math' as math;

void main() {
  int j = 0;
  int delay = 0;
  for(int i = 0; i < 100; i++) {
    new async.Future.delayed(
        new Duration(milliseconds: delay += rnd.nextInt(150)),
        () => receiveData({'${j++}': j}, j)
    ).catchError((e) => print('Error: $e'));
  }
}

//List carlist = [];

WorkQueue workQueue = new WorkQueue(timeout: new Duration(milliseconds: 270));
math.Random rnd = new math.Random();

// had to made this async to make the timeout functionality work
// otherwise Dart would have evaluated the timeouts only after all other
// code has already been finished
async.Future receiveData(Map data, int id) {

  // ensure that the code passed to add isn't entered by another async thread
  // while it is already executed
  // id is just for debugging purposes
  return workQueue.add(id, () {

    async.Completer completer = new async.Completer();
    print('$id start');
//    for (var i = 0; i < carlist.length;) {
//      carlist.add(data);
//      if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
//        carlist.removeAt(i);
//        onDeleteCar(data); // Update the UI.
//      }
//    }

    // dummy task to burn time
    new async.Future.delayed(new Duration(milliseconds: rnd.nextInt(100)), () {
      var val = rnd.nextDouble();
      // add some fun - check if code that throws is handled correctly
      if(val > 0.9) {
        completer.completeError('artifical error ${id}');
        print('$id end with error');
      } else {
        completer.complete();
        print('$id end');
      }
    });

    return completer.future;
  });
  // Need a release lock here
}

// just holds a completer and a closure
class Task {
  // complete after f was executed
  async.Completer completer = new async.Completer();
  // this code should only be entered by one thread at a time
  Function f;
  // only for debugging purposes
  int id;
  // ignore timeout if f has already been invoked
  bool isInvoked = false;

  Task(this.id, this.f, Duration timeout){
    if(timeout != null) {
      completer.future.timeout(timeout, onTimeout: () {
        if(!completer.isCompleted && !isInvoked) {
          completer.completeError('${id} timed out');
        }
      })
      // future.timeout creates a new Future which also throws when
      // the completer is completed with completeError
      // not handling this error ends the app with unhandled exception
      .catchError((_) {});
    }
  }
}

class WorkQueue {
  // enque all calls
  coll.Queue q = new coll.Queue();
  // currently executing?
  bool isExecuting = false;

  // throw when the execution is delayed longer than the provide timeout
  Duration timeout;

  WorkQueue({this.timeout});

  // enqueue a new execution
  async.Future add(int id, Function f) {
    print('add $id - queue length: ${q.length}');
    var t = new Task(id, f, timeout);
    q.add(t);
    // ensure that the queue is processed
    new async.Future(release);
    return t.completer.future;
  }

  // execute next waiting thread if any
  void release() {
    // do nothing if closure is currently being executed or queue is empty
    if(!isExecuting && !q.isEmpty) {
      isExecuting = true;
      Task t = q.removeFirst();
      // check if t hasn't alredy timed out
      if(!t.completer.isCompleted) {
        // ignore timeout because we are already being invoked
        t.isInvoked = true;
        // invoke the closure
        new async.Future(t.f)
        // handle errors in closure
        .catchError((e) {
          t.completer.completeError(e);
        })
        // process next Task in queue
        .then((_) {
          isExecuting = false;
          new async.Future(release);
          if(!t.completer.isCompleted) {
            t.completer.complete();
          }
        });
      }
    }
  }
}

example output

add 1 - queue length: 0
1 start
1 end
add 2 - queue length: 0
2 start
add 3 - queue length: 0
2 end
3 start
3 end
add 4 - queue length: 0
4 start
4 end with error
Error: artifical error 4
add 5 - queue length: 0
5 start
5 end
add 6 - queue length: 0
6 start
add 7 - queue length: 0
6 end
7 start
7 end
add 8 - queue length: 0
8 start
add 9 - queue length: 0
add 10 - queue length: 1
8 end with error
Error: artifical error 8
9 start
9 end
10 start
add 11 - queue length: 0
10 end
11 start
add 12 - queue length: 0
add 13 - queue length: 1
11 end
12 start
12 end
13 start
13 end
add 14 - queue length: 0
14 start
14 end
add 15 - queue length: 0
15 start
add 16 - queue length: 0
15 end
16 start
16 end with error
Error: artifical error 16
add 17 - queue length: 0
17 start
17 end
add 18 - queue length: 0
18 start
18 end
add 19 - queue length: 0
19 start
add 20 - queue length: 0
19 end
20 start
add 21 - queue length: 0
20 end
21 start
add 22 - queue length: 0
21 end
22 start
22 end
add 23 - queue length: 0
23 start
23 end
add 24 - queue length: 0
24 start
add 25 - queue length: 0
add 26 - queue length: 1
add 27 - queue length: 2
24 end with error
Error: artifical error 24
25 start
add 28 - queue length: 2
add 29 - queue length: 3
25 end
26 start
add 30 - queue length: 3
26 end
27 start
27 end
28 start
add 31 - queue length: 2
28 end
29 start
29 end
30 start
30 end
31 start
add 32 - queue length: 0
31 end
32 start
32 end
add 33 - queue length: 0
33 start
33 end
add 34 - queue length: 0
34 start
34 end
add 35 - queue length: 0
35 start
35 end
add 36 - queue length: 0
36 start
add 37 - queue length: 0
36 end
37 start
add 38 - queue length: 0
37 end
38 start
add 39 - queue length: 0
38 end with error
Error: artifical error 38
39 start
39 end with error
Error: artifical error 39
add 40 - queue length: 0
40 start
add 41 - queue length: 0
40 end
41 start
41 end with error
Error: artifical error 41
add 42 - queue length: 0
42 start
add 43 - queue length: 0
add 44 - queue length: 1
add 45 - queue length: 2
42 end with error
Error: artifical error 42
43 start
add 46 - queue length: 2
43 end
44 start
add 47 - queue length: 2
add 48 - queue length: 3
add 49 - queue length: 4
44 end
45 start
add 50 - queue length: 4
45 end with error
Error: artifical error 45
46 start
46 end
47 start
add 51 - queue length: 3
47 end
48 start
Error: 49 timed out
48 end
add 52 - queue length: 2
Error: 50 timed out
add 53 - queue length: 3
add 54 - queue length: 4
add 55 - queue length: 5
Error: 51 timed out
add 56 - queue length: 6
Error: 52 timed out
add 57 - queue length: 7
Error: 53 timed out
Error: 54 timed out
add 58 - queue length: 8
Error: 55 timed out
add 59 - queue length: 9
Error: 56 timed out
Error: 57 timed out
add 60 - queue length: 10
Error: 58 timed out
add 61 - queue length: 11
Error: 59 timed out
add 62 - queue length: 12
add 63 - queue length: 13
Error: 60 timed out
add 64 - queue length: 14
Error: 61 timed out
Error: 62 timed out
add 65 - queue length: 15
Error: 63 timed out
add 66 - queue length: 16
Error: 64 timed out
add 67 - queue length: 17
Error: 65 timed out
add 68 - queue length: 18
Error: 66 timed out
add 69 - queue length: 19
add 70 - queue length: 20
add 71 - queue length: 21
Error: 67 timed out
add 72 - queue length: 22
Error: 68 timed out
add 73 - queue length: 23
Error: 69 timed out
Error: 70 timed out
Error: 71 timed out
add 74 - queue length: 24
Error: 72 timed out
Error: 73 timed out
add 75 - queue length: 25
add 76 - queue length: 26
Error: 74 timed out
add 77 - queue length: 27
add 78 - queue length: 28
Error: 75 timed out
Error: 76 timed out
add 79 - queue length: 29
add 80 - queue length: 30
Error: 77 timed out
Error: 78 timed out
add 81 - queue length: 31
add 82 - queue length: 32
add 83 - queue length: 33
Error: 79 timed out
Error: 80 timed out
add 84 - queue length: 34
Error: 81 timed out
add 85 - queue length: 35
Error: 82 timed out
Error: 83 timed out
add 86 - queue length: 36
Error: 84 timed out
add 87 - queue length: 37
Error: 85 timed out
add 88 - queue length: 38
add 89 - queue length: 39
add 90 - queue length: 40
add 91 - queue length: 41
Error: 86 timed out
add 92 - queue length: 42
Error: 87 timed out
Error: 88 timed out
add 93 - queue length: 43
Error: 89 timed out
Error: 90 timed out
Error: 91 timed out
Error: 92 timed out
add 94 - queue length: 44
add 95 - queue length: 45
add 96 - queue length: 46
add 97 - queue length: 47
Error: 93 timed out
add 98 - queue length: 48
Error: 94 timed out
Error: 95 timed out
Error: 96 timed out
add 99 - queue length: 49
Error: 97 timed out
add 100 - queue length: 50
Error: 98 timed out
Error: 99 timed out
Error: 100 timed out
Population answered 31/7, 2014 at 19:27 Comment(0)
S
3

Just use package Mutex!

flutter pub add mutex

List carlist;
final locker = Mutex();

void receiveData(Map data) {
  // Need a lock here
  locker.acquire();
  try {
    for (var i = 0; i < carlist.lenght;) {
      if (data[carlist[i]['name']]['color'] == carlist[i]['color']) {
        carlist.removeAt(i);
        onDeleteCar(data); // Update the UI.
      }
    }
  } finally {
    // Need a release lock here
    locker.release();
  }
}
Sidelong answered 24/1, 2023 at 10:33 Comment(0)
G
3

Here is a simple solution if used in an async function:

bool mutex = false;

Future<void> fun() async {
  while (mutex) {
    await Future.delayed(const Duration(milliseconds: 50));
  }
  mutex = true;
  try {
    // do something
  } finally {
    mutex = false;
  }
}
Gimbals answered 25/3, 2023 at 8:46 Comment(1)
Precisely what I had in mind + a timeout for the while()Dupleix

© 2022 - 2024 — McMap. All rights reserved.