Weird Hazelcat IMap#put() behaviour
Asked Answered
U

1

14

My Hazelcast-based program can work in two modes: submitter and worker.

Submitter puts some POJO to the distributed map by some key, e.g.: hazelcastInstance.getMap(MAP_NAME).put(key, value);

Worker has an infinite loop (with Thread.sleep(1000L); inside for timeout) which must process entities from map. For now I'm just printing the map size in this loop.

Now here's the problem. I start worker app. Then I start four submitters simultaneously (each adds an entry to the map and terminates it's work). But after all submitter apps are done, the worker app prints arbitrary size: sometimes it detects that only one entry was added, sometimes two, sometimes three (actually it never has seen all four entries).

What is the problem with this simple flow? I've read in Hazelcast docs that put() method is synchronous, so it guarantees that after it returns, entry is placed to distributed map and is replicated. But it doesn't seem so in my experiment.

UPD (code)

Submitter:

public void submit(String key) {
    Object mySerializableObject = ...
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}

Worker:

public void process() {
    while (true) {
        IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
        System.out.println(map.size());

        // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
        // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
        try {
            Thread.sleep(PAUSE);
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

I commented out "processing" part itself, because now I'm just trying to get consistent state of the map. The code above prints different results each time, e.g.: "4, 3, 1, 1, 1, 1, 1..." (so it can even see 4 submitted tasks for a moment, but then they... disappear).

UPD (log)

Worker:

...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...

Submitter 1:

Before: tasksMap.size() = 0
After: tasksMap.size() = 1

Submitter 2:

Before: tasksMap.size() = 1
After: tasksMap.size() = 4

Submitter 3:

Before: tasksMap.size() = 1
After: tasksMap.size() = 2

Submitter 4:

Before: tasksMap.size() = 3
After: tasksMap.size() = 4
Unleash answered 24/4, 2016 at 20:29 Comment(7)
The IMap::size method is an estimation, anyhow it should stabilize eventually. Can you share some more code?Soccer
@Soccer , I've update the question.Unleash
Is your code using embedded members and do they actually stop after submitting the value? I could imagine the members to quickly leave the cluster to fulfil the backup requirements on leave.Soccer
@DmytroTitov Are your Submitter and Worker processes using Hazelcast client api to connect to the cluster, or are they running on the nodes of the cluster themselves? What is the reported size if you printed it before and after map.putIfAbsent in Submitter processes?Laurenlaurena
@Laurenlaurena They are running on the same cluster. I logged the map size on the worker and on the four submitters (see the update to the question).Unleash
@Soccer I don't know what is "embedded member", but yes, after performing putIfAbsent submitter application terminates via System.exit().Unleash
Embedded member means, you start your Hazelcast nodes (cluster members) inside your applications. Data is partitioned, therefore if you kill members too fast, backups cannot be synced and you loose data.Soccer
U
7

Well, I guess, I've figured out the problem. As far as I understand, distributed IMap returned by hazelcastInstance.getMap doesn't guarantee that data is replicated over all existing nodes in the cluster: some portions of data may be replicated to some nodes, another portion - to another nodes. That's why in my example some of submitted tasks were replicated not to worker node (which works perpetually), but to some other submitters, which terminate their execution after submission. So such entries were lost on submitters exit.

I solved this issue by replacing hazelcastInstance.getMap to hazelcastInstance.getReplicatedMap. This method returns ReplicatedMap, which, AFAIK, guarantees that entries placed into it will be replicated to all nodes of the cluster. So now everything works fine in my system.

Unleash answered 30/4, 2016 at 13:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.