Hazelcast地图同步

5

我正在尝试在我的应用程序中使用Hazelcast实现分布式缓存。 我正在使用Hazelcast的IMap。 我遇到的问题是每次从映射中获取值并更新该值时,我需要再次执行put(key, value)操作。 如果我的值对象有10个属性并且我必须更新所有10个属性,则我必须调用put(key, value) 10次。 就像这样 -

IMap<Integer, Employee> mapEmployees = hz.getMap("employees");
Employee emp1 = mapEmployees.get(100);
emp1.setAge(30);
mapEmployees.put(100, emp1);
emp1.setSex(“F”);
mapEmployees.put(100, emp1);
emp1.setSalary(5000);
mapEmployees.put(100, emp1);

如果我不这样做,那么操作相同员工对象的另一个节点将对其进行更新,最终结果是员工对象未同步。有没有避免多次明确调用put的解决方案?在ConcurrentHashMap中,我不需要这样做,因为如果我更改了对象,则该映射表也会被更新。

2
Hazelcast会为您提供对象的克隆版本(因为它以二进制/序列化形式存储在集群中)。为了使更新对其他节点/线程可见,您应该将其放回。此外,如果您的Employee类不是线程安全的,则JDK ConcurrentHashMap无法保证其他线程能够看到更新(即使您将其放回到映射中)。因此,在任何情况下,您都应该使用同步机制。 - mdogan
5个回答

9
从版本3.3开始,您将需要使用EntryProcessor:
您真正想要做的是构建一个EntryProcessor ,并使用以下方式调用它: mapEmployees.executeOnKey(100,new EmployeeUpdateEntryProcessor(new ObjectContainingUpdatedFields(30,“F”,5000));
这样,Hazelcast会在该Employee对象的键上处理地图锁定,并允许您运行EntryProcessor中process()方法中的任何代码,包括更新映射中的值。
因此,您将使用自定义构造函数实现EntryProcessor,该构造函数接受包含要更新的所有属性的对象,然后在process()中构造最终的Employee对象,该对象最终将出现在映射中,并进行entry.setValue()。不要忘记为EmployeeUpdateEntryProcessor创建新的StreamSerializer,以便可以序列化Employee对象,以便不会陷入java.io序列化。
来源: http://docs.hazelcast.org/docs/3.5/manual/html/entryprocessor.html

2

也许你需要使用事务,或者你可能想看一下分布式锁

请注意,在你的解决方案中,如果这段代码被两个线程运行,其中一个线程所做的更改将被覆盖。


1

这个可能会引起您的兴趣。

你可以像这样为你的Employee类做一些事情(只有一个实例变量的简化代码):

public final class Employee
    implements Frozen<Builder>
{
    private final int salary;

    private Employee(Builder builder)
    {
        salary = builder.salary;
    }

    public static Builder newBuilder()
    {
        return new Builder();
    }

    @Override
    public Builder thaw()
    {
        return new Builder(this);
    }

    public static final class Builder
        implements Thawed<Employee>
    {
        private int salary;

        private Builder()
        {
        }

        private Builder(Employee employee)
        {
            salary = employee.salary;
        }

        public Builder withSalary(int salary)
        {
            this.salary = salary;
            return this;
        }

        @Override
        public Employee freeze()
        {
            return new Employee(this);
        }
    }
}

这样,要修改您的缓存,您需要:
Employee victim = map.get(100);
map.put(100, victim.thaw().withSalary(whatever).freeze());

这是一个完全的原子操作。


1
这不是一个原子操作,因为你需要读取、修改和写入。如果你真的想要一个原子操作,而不使用锁或事务,可以采用以下方法:for(;;){ Person oldPerson = map.get("foo"); Person newPerson = new Person(oldPerson); newPerson.incAge(); if(map.replace("foo",oldPerson,newPerson)) break; }抱歉排版不好。 - pveentjer

0

0
如果有可能另一个节点可以更新您的节点正在处理的数据,那么使用put()将覆盖另一个节点所做的更改。通常这是不希望的行为,因为它会导致数据丢失和不一致的数据状态。
看一下IMap.replace()方法和其他ConcurrentMap相关的方法。如果replace()失败,则遇到了更改冲突。在这种情况下,您应该再尝试一次:
  1. 从hazelcast重新读取条目
  2. 更新其字段
  3. 用replace保存到hazelcast
经过一些失败的尝试后,您可以将StorageException抛出到上一级。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接