Hazelcast map synchronization
Asked Answered
T

5

5

I am trying to implement distributed cache using Hazelcast in my application. I am using Hazelcast’s IMap. The problem I have is every time I get a value from a map and update the value, I need to do a put(key, value) again. If my value object has 10 properties and I have to update all 10, then I have to call put(key, value) 10 times. Something like -

IMap<Integer, Employee> mapEmployees = hz.getMap("employees");
Employee emp1 = mapEmployees.get(100);
emp1.setAge(30);
mapEmployees.put(100, emp1);
emp1.setSex(“F”);
mapEmployees.put(100, emp1);
emp1.setSalary(5000);
mapEmployees.put(100, emp1);

If I don’t do this way, some other node which operates on the same Employee object will update it and the final result is that the employee object is not synchronized. Is there any solution to avoid calling put explicitly multiple times? In a ConcurrentHashMap, I don’t need to do this because if I change the object, the map also gets updated.

Triennium answered 19/6, 2013 at 4:22 Comment(1)
Hazelcast gives you a clone of the object (since it's stores in cluster as binary/serialized). To make the updates visible to the other nodes/threads you should put it back. Also if your Employee class is not thread-safe then JDK ConcurrentHashMap cannot guarantee visibility of updates to the other threads (sometimes even if you put it back to the map). So in either cases you should use a synchronization mechanism.Foodstuff
I
10

As of version 3.3 you'll want to use an EntryProcessor:

What you really want to do here is build an EntryProcessor<Integer, Employee> and call it using mapEmployees.executeOnKey( 100, new EmployeeUpdateEntryProcessor( new ObjectContainingUpdatedFields( 30, "F", 5000 ) );

This way, Hazelcast handles locking the map on the key for that Employee object and allows you to run whatever code is in the EntryProcessor's process() method atomically including updating values in the map.

So you'd implement EntryProcessor with a custom constructor that takes an object that contains all of the properties you want to update, then in process() you construct the final Employee object that will end up in the map and do an entry.setValue(). Don't forget to create a new StreamSerializer for the EmployeeUpdateEntryProcessor that can serialize Employee objects so that you don't get stuck with java.io serialization.

Source: http://docs.hazelcast.org/docs/3.5/manual/html/entryprocessor.html

Inexpressive answered 26/10, 2015 at 16:54 Comment(0)
T
2

Probably a transaction is what you need. Or you may want to take a look at distributed lock.

Note that in your solution if this code is ran by two threads changes made by one of them will be overwriten.

Tip answered 19/6, 2013 at 6:3 Comment(0)
T
1

This may interest you.

You could do something like this for your Employee class (simplified code with one instance variable only):

public final class Employee
    implements Frozen<Builder>
{
    private final int salary;

    private Employee(Builder builder)
    {
        salary = builder.salary;
    }

    public static Builder newBuilder()
    {
        return new Builder();
    }

    @Override
    public Builder thaw()
    {
        return new Builder(this);
    }

    public static final class Builder
        implements Thawed<Employee>
    {
        private int salary;

        private Builder()
        {
        }

        private Builder(Employee employee)
        {
            salary = employee.salary;
        }

        public Builder withSalary(int salary)
        {
            this.salary = salary;
            return this;
        }

        @Override
        public Employee freeze()
        {
            return new Employee(this);
        }
    }
}

This way, to modify your cache, you would:

Employee victim = map.get(100);
map.put(100, victim.thaw().withSalary(whatever).freeze());

This is a completely atomic operation.

Too answered 19/6, 2013 at 4:39 Comment(1)
This is not an atomic operation, since you have a read, modify, write. If you really want to have an atomic operation, without resorting to a lock or a transaction, you can use the following approach: ' for(;;){ Person oldPerson = map.get("foo"); Person newPerson = new Person(oldPerson); newPerson.incAge(); if(map.replace("foo",oldPerson,newPerson)) break; } ' sorry for the shitty layout..Kwangju
R
0

If there is possibility that another node can update data that your node is working with then using put() will overwrite changes made by another node. Usually it is unwanted behavior, cause it leads to data loss and inconsistent data state.

Take a look at IMap.replace() method and other ConcurrentMap related methods. If replace() is failed then you've faced changes collision. In this case you should give it another attempt:

  1. re-read entry from hazelcast
  2. update it's fields
  3. save to hazelcast with replace

After some failed attempts you can throw StorageException to the upper level.

Ruysdael answered 15/6, 2016 at 23:54 Comment(0)
H
0

You should use tryLock on your map entry :

    long timeout = 60;  // Define your own timeout 
    if (mapEmployees.tryLock(100, timeout, TimeUnits.SECONDS)){
       try {
          Employee emp1 = mapEmployees.get(100);
          emp1.setAge(30);
          emp1.setSex(“F”);
          emp1.setSalary(5000);
          mapEmployees.put(100, emp1);
       } finally {
           mapEmployees.unlock(100);
       }
   }else{
      // do something else like log.warn(...) 
   }

See : https://docs.hazelcast.com/imdg/4.2/data-structures/fencedlock#releasing-locks-with-trylock-timeout

Hartshorn answered 29/9, 2021 at 14:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.