IDEMPIERE-3653 1008833 Upgrade hazelcast to version 3.9.3 / integrate patch from hengsin

This commit is contained in:
Carlos Ruiz 2018-01-23 17:19:21 +08:00
parent 3bc0e45599
commit 4069878bf9
13 changed files with 258 additions and 184 deletions

View File

@ -2352,7 +2352,7 @@ public class GridTab implements DataStatusListener, Evaluatee, Serializable
return; // avoid NPE below
}
DataStatusListener[] listeners = m_listenerList.getListeners(DataStatusListener.class);
if (listeners.length == 0)
if (listeners.length == 0 || e == null)
return;
if (log.isLoggable(Level.FINE)) log.fine(e.toString());
// WHO Info

View File

@ -166,9 +166,9 @@ public class CacheMgt
int total = 0;
try {
Collection<Future<Integer>> results = futureMap.values();
for(Future<Integer> future : results) {
Integer i = future.get();
total += i.intValue();
for(Future<Integer> i : results)
{
total += i.get();
}
} catch (InterruptedException e) {
e.printStackTrace();

View File

@ -191,7 +191,11 @@ public class DefaultDesktop extends TabbedDesktop implements MenuListener, Seria
m_desktop.addListener(this);
//subscribing to broadcast event
bindEventManager();
ZKBroadCastManager.getBroadCastMgr();
try {
ZKBroadCastManager.getBroadCastMgr();
} catch (Throwable e) {
e.printStackTrace();
}
EventQueue<Event> queue = EventQueues.lookup(ACTIVITIES_EVENT_QUEUE, true);
queue.subscribe(this);

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2008-2012, Hazel Bilisim Ltd. All Rights Reserved.
~ Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@ -15,7 +15,13 @@
~ limitations under the License.
-->
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-2.4.xsd"
<!--
The default Hazelcast configuration. This is used when no hazelcast.xml is present.
Please see the schema for how to configure Hazelcast at https://hazelcast.com/schema/config/hazelcast-config-3.9.xsd
or the documentation at https://hazelcast.org/documentation/
-->
<!--suppress XmlDefaultAttributeValue -->
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.9.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
@ -24,7 +30,7 @@
</group>
<management-center enabled="false">http://localhost:8080/mancenter</management-center>
<network>
<port auto-increment="true">5701</port>
<port auto-increment="true" port-count="100">5701</port>
<outbound-ports>
<!--
Allowed port range when connecting to other nodes.
@ -39,6 +45,9 @@
</multicast>
<tcp-ip enabled="false">
<interface>127.0.0.1</interface>
<member-list>
<member>127.0.0.1</member>
</member-list>
</tcp-ip>
<aws enabled="false">
<access-key>my-access-key</access-key>
@ -46,18 +55,20 @@
<!--optional, default is us-east-1 -->
<region>us-west-1</region>
<!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property -->
<hostHeader>ec2.amazonaws.com</hostHeader>
<host-header>ec2.amazonaws.com</host-header>
<!-- optional, only instances belonging to this group will be discovered, default will try all running instances -->
<security-group-name>hazelcast-sg</security-group-name>
<tag-key>type</tag-key>
<tag-value>hz-nodes</tag-value>
</aws>
<discovery-strategies>
</discovery-strategies>
</join>
<interfaces enabled="false">
<interface>10.10.1.*</interface>
</interfaces>
<ssl enabled="false" />
<socket-interceptor enabled="false" />
<ssl enabled="false"/>
<socket-interceptor enabled="false"/>
<symmetric-encryption enabled="false">
<!--
encryption algorithm such as
@ -75,26 +86,12 @@
<!-- iteration count to use when generating the secret key -->
<iteration-count>19</iteration-count>
</symmetric-encryption>
<asymmetric-encryption enabled="false">
<!-- encryption algorithm -->
<algorithm>RSA/NONE/PKCS1PADDING</algorithm>
<!-- private key password -->
<keyPassword>thekeypass</keyPassword>
<!-- private key alias -->
<keyAlias>local</keyAlias>
<!-- key store type -->
<storeType>JKS</storeType>
<!-- key store password -->
<storePassword>thestorepass</storePassword>
<!-- path to the key store -->
<storePath>keystore</storePath>
</asymmetric-encryption>
</network>
<partition-group enabled="false"/>
<executor-service>
<core-pool-size>16</core-pool-size>
<max-pool-size>64</max-pool-size>
<keep-alive-seconds>60</keep-alive-seconds>
<executor-service name="default">
<pool-size>16</pool-size>
<!--Queue capacity. 0 means Integer.MAX_VALUE.-->
<queue-capacity>0</queue-capacity>
</executor-service>
<queue name="default">
<!--
@ -104,14 +101,31 @@
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size-per-jvm>0</max-size-per-jvm>
<max-size>0</max-size>
<!--
Name of the map configuration that will be used for the backing distributed
map for this queue.
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backing-map-ref>default</backing-map-ref>
<backup-count>1</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
</queue>
<map name="default">
<!--
Data type that will be used for storing recordMap.
Possible values:
BINARY (default): keys and values will be stored as binary data
OBJECT : values will be stored in their object forms
NATIVE : values will be stored in non-heap region of JVM
-->
<in-memory-format>BINARY</in-memory-format>
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
@ -150,14 +164,21 @@
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size policy="cluster_wide_map_size">1000</max-size>
<max-size policy="PER_NODE">1000</max-size>
<!--
When max. size is reached, specified percentage of
the map will be evicted. Any integer between 0 and 100.
If 25 is set for example, 25% of the entries will
get evicted.
`eviction-percentage` property is deprecated and will be ignored when it is set.
As of version 3.7, eviction mechanism changed.
It uses a probabilistic algorithm based on sampling. Please see documentation for further details
-->
<eviction-percentage>25</eviction-percentage>
<!--
`min-eviction-check-millis` property is deprecated and will be ignored when it is set.
As of version 3.7, eviction mechanism changed.
It uses a probabilistic algorithm based on sampling. Please see documentation for further details
-->
<min-eviction-check-millis>100</min-eviction-check-millis>
<!--
While recovering from split-brain (network partitioning),
map entries in the small cluster will merge into the bigger cluster
@ -165,32 +186,93 @@
cluster, there might an existing entry with the same key already.
Values of these entries might be different for that same key.
Which value should be set for the key? Conflict is resolved by
the policy set here. Default policy is hz.ADD_NEW_ENTRY
the policy set here. Default policy is PutIfAbsentMapMergePolicy
There are built-in merge policies such as
hz.NO_MERGE ; no entry will merge.
hz.ADD_NEW_ENTRY ; entry will be added if the merging entry's key
doesn't exist in the cluster.
hz.HIGHER_HITS ; entry with the higher hits wins.
hz.LATEST_UPDATE ; entry with the latest update wins.
com.hazelcast.map.merge.PassThroughMergePolicy; entry will be overwritten if merging entry exists for the key.
com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
-->
<merge-policy>hz.ADD_NEW_ENTRY</merge-policy>
</map>
<!-- Add your own semaphore configurations here:
<semaphore name="default">
<initial-permits>10</initial-permits>
<semaphore-factory enabled="true">
<class-name>com.acme.MySemaphoreFactory</class-name>
</semaphore-factory>
</semaphore>
-->
<merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
<!-- Add your own map merge policy implementations here:
<merge-policies>
<map-merge-policy name="MY_MERGE_POLICY">
<class-name>com.acme.MyOwnMergePolicy</class-name>
</map-merge-policy>
</merge-policies>
-->
<!--
Control caching of de-serialized values. Caching makes query evaluation faster, but it cost memory.
Possible Values:
NEVER: Never cache deserialized object
INDEX-ONLY: Caches values only when they are inserted into an index.
ALWAYS: Always cache deserialized values.
-->
<cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>
</map>
<!--
Configuration for an event journal. The event journal keeps events related
to a specific partition and data structure. For instance, it could keep
map add, update, remove, merge events along with the key, old value, new value and so on.
-->
<event-journal enabled="false">
<mapName>mapName</mapName>
<capacity>10000</capacity>
<time-to-live-seconds>0</time-to-live-seconds>
</event-journal>
<event-journal enabled="false">
<cacheName>cacheName</cacheName>
<capacity>10000</capacity>
<time-to-live-seconds>0</time-to-live-seconds>
</event-journal>
<multimap name="default">
<backup-count>1</backup-count>
<value-collection-type>SET</value-collection-type>
</multimap>
<list name="default">
<backup-count>1</backup-count>
</list>
<set name="default">
<backup-count>1</backup-count>
</set>
<jobtracker name="default">
<max-thread-size>0</max-thread-size>
<!-- Queue size 0 means number of partitions * 2 -->
<queue-size>0</queue-size>
<retry-count>0</retry-count>
<chunk-size>1000</chunk-size>
<communicate-stats>true</communicate-stats>
<topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
</jobtracker>
<semaphore name="default">
<initial-permits>0</initial-permits>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
</semaphore>
<reliable-topic name="default">
<read-batch-size>10</read-batch-size>
<topic-overload-policy>BLOCK</topic-overload-policy>
<statistics-enabled>true</statistics-enabled>
</reliable-topic>
<ringbuffer name="default">
<capacity>10000</capacity>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<time-to-live-seconds>0</time-to-live-seconds>
<in-memory-format>BINARY</in-memory-format>
</ringbuffer>
<serialization>
<portable-version>0</portable-version>
</serialization>
<services enable-defaults="true"/>
<lite-member enabled="false"/>
</hazelcast>

View File

@ -3,7 +3,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="src" path="src"/>
<classpathentry kind="lib" path="hazelcast-3.5.3.jar"/>
<classpathentry kind="lib" path="hazelcast-cloud-3.5.3.jar"/>
<classpathentry kind="lib" path="hazelcast-3.9.3.jar"/>
<classpathentry kind="lib" path="hazelcast-aws-2.1.0.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>

View File

@ -26,7 +26,7 @@ Import-Package: javax.crypto,
org.w3c.dom
DynamicImport-Package: *
Bundle-ClassPath: .,
hazelcast-3.5.3.jar,
hazelcast-cloud-3.5.3.jar
hazelcast-3.9.3.jar,
hazelcast-aws-2.1.0.jar
Service-Component: OSGI-INF/clusterservice.xml, OSGI-INF/cacheservice.xml, OSGI-INF/messageservice.xml
Bundle-RequiredExecutionEnvironment: JavaSE-1.8

View File

@ -5,6 +5,6 @@ bin.includes = META-INF/,\
OSGI-INF/cacheservice.xml,\
OSGI-INF/messageservice.xml,\
OSGI-INF/,\
hazelcast-3.5.3.jar,\
hazelcast-cloud-3.5.3.jar
hazelcast-3.9.3.jar,\
hazelcast-aws-2.1.0.jar
source.. = src/

View File

@ -1,6 +1,6 @@
<project name="zklibrary" basedir="." default="copy">
<target name="copy">
<get src="${url.maven2.lib}/maven2/com/hazelcast/hazelcast/3.5.3/hazelcast-3.5.3.jar" dest="hazelcast-3.5.3.jar" usetimestamp="true" verbose="true" retries="5" />
<get src="${url.maven2.lib}/maven2/com/hazelcast/hazelcast-cloud/3.5.3/hazelcast-cloud-3.5.3.jar" dest="hazelcast-cloud-3.5.3.jar" usetimestamp="true" verbose="true" retries="5" />
<get src="${url.maven2.lib}/maven2/com/hazelcast/hazelcast/3.9.3/hazelcast-3.9.3.jar" dest="hazelcast-3.9.3.jar" usetimestamp="true" verbose="true" retries="5" />
<get src="${url.maven2.lib}/maven2/com/hazelcast/hazelcast-aws/2.1.0/hazelcast-aws-2.1.0.jar" dest="hazelcast-aws-2.1.0.jar" usetimestamp="true" verbose="true" retries="5" />
</target>
</project>

View File

@ -20,9 +20,6 @@ import java.net.URL;
import java.text.DateFormat;
import java.util.Date;
import java.util.Enumeration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.compiere.Adempiere;
import org.compiere.model.ServerStateChangeEvent;
@ -34,7 +31,8 @@ import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.UrlXmlConfig;
import com.hazelcast.core.*;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
/**
*
@ -50,7 +48,6 @@ public class Activator implements BundleActivator {
}
private volatile static HazelcastInstance hazelcastInstance;
private static Future<?> future;
/*
* (non-Javadoc)
@ -73,14 +70,19 @@ public class Activator implements BundleActivator {
}
private static synchronized void createHazelCastInstance() {
ScheduledThreadPoolExecutor executor = Adempiere.getThreadPoolExecutor();
future = executor.submit(new Runnable() {
@Override
public void run() {
File file = null;
//try idempiere home
String dataArea = System.getProperty("IDEMPIERE_HOME");
File file = null;
//try idempiere home
String dataArea = System.getProperty("IDEMPIERE_HOME");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
file = new File(dataArea, "hazelcast.xml");
if (!file.exists())
file = null;
} catch (Exception e) {}
}
//try working directory
if (file == null) {
dataArea = System.getProperty("user.dir");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
file = new File(dataArea, "hazelcast.xml");
@ -88,88 +90,68 @@ public class Activator implements BundleActivator {
file = null;
} catch (Exception e) {}
}
//try working directory
if (file == null) {
dataArea = System.getProperty("user.dir");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
file = new File(dataArea, "hazelcast.xml");
if (!file.exists())
file = null;
} catch (Exception e) {}
}
}
//try osgi install area
if (file == null) {
dataArea = System.getProperty("osgi.install.area");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
URL url = new URL(dataArea);
file = new File(url.getPath(), "hazelcast.xml");
if (!file.exists())
file = null;
} catch (Exception e) {}
}
}
//try hazelcast.config - to be consistent with hazelcast configuration documentation
if (file == null) {
dataArea = System.getProperty("hazelcast.config");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
file = new File(dataArea);
if (!file.exists())
file = null;
} catch (Exception e) {}
}
}
if (file != null && file.exists()) {
}
//try osgi install area
if (file == null) {
dataArea = System.getProperty("osgi.install.area");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
Config config = new FileSystemXmlConfig(file);
config.setClassLoader(getClass().getClassLoader());
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
MapConfig mc = config.getMapConfig("default");
if (mc != null) {
System.out.println("Hazelcast Max Size Config: "+mc.getMaxSizeConfig().getMaxSizePolicy() + " " + mc.getMaxSizeConfig().getSize());
}
return;
} catch (FileNotFoundException e) {}
}
Enumeration<URL> entries = getContext().getBundle().findEntries("/", "hazelcast.xml", false);
URL url = entries.hasMoreElements() ? entries.nextElement() : null;
if (url != null) {
try {
Config config = new UrlXmlConfig(url);
config.setClassLoader(getClass().getClassLoader());
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
MapConfig mc = config.getMapConfig("default");
if (mc != null) {
System.out.println("Hazelcast Max Size Config: "+mc.getMaxSizeConfig().getMaxSizePolicy() + " " + mc.getMaxSizeConfig().getSize());
}
return;
} catch (IOException e) {}
}
Config config = new Config();
config.setClassLoader(getClass().getClassLoader());
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
MapConfig mc = config.getMapConfig("default");
if (mc != null) {
System.out.println("Hazelcast Max Size Config: "+mc.getMaxSizeConfig().getMaxSizePolicy() + " " + mc.getMaxSizeConfig().getSize());
URL url = new URL(dataArea);
file = new File(url.getPath(), "hazelcast.xml");
if (!file.exists())
file = null;
} catch (Exception e) {}
}
}
});
//try hazelcast.config - to be consistent with hazelcast configuration documentation
if (file == null) {
dataArea = System.getProperty("hazelcast.config");
if (dataArea != null && dataArea.trim().length() > 0) {
try {
file = new File(dataArea);
if (!file.exists())
file = null;
} catch (Exception e) {}
}
}
if (file != null && file.exists()) {
try {
Config config = new FileSystemXmlConfig(file);
config.setClassLoader(Activator.class.getClassLoader());
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
MapConfig mc = config.getMapConfig("default");
if (mc != null) {
System.out.println("Hazelcast Max Size Config: "+mc.getMaxSizeConfig().getMaxSizePolicy() + " " + mc.getMaxSizeConfig().getSize());
}
return;
} catch (FileNotFoundException e) {}
}
Enumeration<URL> entries = getContext().getBundle().findEntries("/", "hazelcast.xml", false);
URL url = entries.hasMoreElements() ? entries.nextElement() : null;
if (url != null) {
try {
Config config = new UrlXmlConfig(url);
config.setClassLoader(Activator.class.getClassLoader());
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
MapConfig mc = config.getMapConfig("default");
if (mc != null) {
System.out.println("Hazelcast Max Size Config: "+mc.getMaxSizeConfig().getMaxSizePolicy() + " " + mc.getMaxSizeConfig().getSize());
}
return;
} catch (IOException e) {}
}
Config config = new Config();
config.setClassLoader(Activator.class.getClassLoader());
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
MapConfig mc = config.getMapConfig("default");
if (mc != null) {
System.out.println("Hazelcast Max Size Config: "+mc.getMaxSizeConfig().getMaxSizePolicy() + " " + mc.getMaxSizeConfig().getSize());
}
}
public static synchronized HazelcastInstance getHazelcastInstance() {
if (future != null && !future.isDone()) {
try {
future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
if (hazelcastInstance != null) {
if (!hazelcastInstance.getLifecycleService().isRunning()) {
System.err.println(DateFormat.getDateTimeInstance().format(new Date()) + " Hazelcast instance is down!");
@ -200,9 +182,7 @@ public class Activator implements BundleActivator {
public void stop(BundleContext bundleContext) throws Exception {
Activator.context = null;
synchronized (Activator.class) {
if (future != null && !future.isDone()) {
future.cancel(true);
} else if (hazelcastInstance != null) {
if (hazelcastInstance != null) {
hazelcastInstance.getLifecycleService().shutdown();
hazelcastInstance = null;
}

View File

@ -17,6 +17,8 @@ import java.net.InetAddress;
import org.idempiere.distributed.IClusterMember;
import com.hazelcast.core.Member;
/**
* @author hengsin
*
@ -37,6 +39,12 @@ public class ClusterMember implements IClusterMember {
this.port = port;
}
public ClusterMember(Member member) {
this.id = member.getUuid();
this.address = member.getSocketAddress().getAddress();
this.port = member.getSocketAddress().getPort();
}
/* (non-Javadoc)
* @see org.idempiere.distributed.IClusterMember#getId()
*/

View File

@ -14,8 +14,8 @@
package org.idempiere.hazelcast.service;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@ -25,6 +25,7 @@ import org.idempiere.distributed.IClusterMember;
import org.idempiere.distributed.IClusterService;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;
/**
@ -43,7 +44,7 @@ public class ClusterServiceImpl implements IClusterService {
if (instance != null) {
Set<Member> members = instance.getCluster().getMembers();
for(Member member : members) {
clusterMembers.add(new ClusterMember(member.getUuid(), member.getSocketAddress().getAddress(), member.getSocketAddress().getPort()));
clusterMembers.add(new ClusterMember(member));
}
}
return clusterMembers;
@ -57,7 +58,7 @@ public class ClusterServiceImpl implements IClusterService {
HazelcastInstance instance = Activator.getHazelcastInstance();
if (instance != null) {
Member member = instance.getCluster().getLocalMember();
return new ClusterMember(member.getUuid(), member.getSocketAddress().getAddress(), member.getSocketAddress().getPort());
return new ClusterMember(member);
} else {
return null;
}
@ -73,7 +74,8 @@ public class ClusterServiceImpl implements IClusterService {
Set<Member> members = instance.getCluster().getMembers();
for(Member member : members) {
if (member.getUuid().equals(clusterMember.getId())) {
return Activator.getHazelcastInstance().getExecutorService("default").submitToMember(task, member);
IExecutorService service = Activator.getHazelcastInstance().getExecutorService("default");
return service.submitToMember(task, member);
}
}
}
@ -100,13 +102,13 @@ public class ClusterServiceImpl implements IClusterService {
}
}
if (selectedMembers.size() > 0) {
Map<Member, Future<V>> maps = Activator.getHazelcastInstance().getExecutorService("default").submitToMembers(task, selectedMembers);
Map<IClusterMember, Future<V>> result = new HashMap<IClusterMember, Future<V>>();
for(Member m : maps.keySet()) {
ClusterMember cm = new ClusterMember(m.getUuid(), m.getSocketAddress().getAddress(), m.getSocketAddress().getPort());
result.put(cm, maps.get(m));
IExecutorService service = Activator.getHazelcastInstance().getExecutorService("default");
Map<Member, Future<V>> map = service.submitToMembers(task, selectedMembers);
Map<IClusterMember, Future<V>> futureMap = new LinkedHashMap<>();
for(Member member : map.keySet()) {
futureMap.put(new ClusterMember(member), map.get(member));
}
return result;
return futureMap;
}
}
return null;

View File

@ -14,7 +14,9 @@
package org.idempiere.hazelcast.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.idempiere.distributed.ITopic;
import org.idempiere.distributed.ITopicSubscriber;
@ -31,6 +33,7 @@ public class TopicImpl<E> implements ITopic<E> {
private com.hazelcast.core.ITopic<E> topic;
private List<TopicSubscriberAdapter<E>> adapters;
private Map<TopicSubscriberAdapter<E>, String> registrationMap;
/**
*
@ -38,6 +41,7 @@ public class TopicImpl<E> implements ITopic<E> {
public TopicImpl(com.hazelcast.core.ITopic<E> topic) {
this.topic = topic;
adapters = new ArrayList<TopicSubscriberAdapter<E>>();
registrationMap = new HashMap<>();
}
@Override
@ -48,8 +52,9 @@ public class TopicImpl<E> implements ITopic<E> {
@Override
public void subscribe(final ITopicSubscriber<E> subscriber) {
TopicSubscriberAdapter<E> adapter = new TopicSubscriberAdapter<E>(subscriber);
adapter.setListenerId(topic.addMessageListener(adapter));
String registrationId = topic.addMessageListener(adapter);
adapters.add(adapter);
registrationMap.put(adapter, registrationId);
}
@Override
@ -58,7 +63,9 @@ public class TopicImpl<E> implements ITopic<E> {
for(TopicSubscriberAdapter<E> adapter : adapters) {
if (adapter.subscriber == subscriber) {
found = adapter;
topic.removeMessageListener(adapter.getListenerId());
String registrationId = registrationMap.get(adapter);
if (topic.removeMessageListener(registrationId))
registrationMap.remove(adapter);
break;
}
}
@ -73,20 +80,11 @@ public class TopicImpl<E> implements ITopic<E> {
class TopicSubscriberAdapter<T> implements MessageListener<T> {
protected ITopicSubscriber<T> subscriber;
private String listenerId;
protected TopicSubscriberAdapter(ITopicSubscriber<T> subscriber) {
this.subscriber = subscriber;
}
public void setListenerId(String listenerId) {
this.listenerId = listenerId;
}
public String getListenerId() {
return listenerId;
}
@Override
public void onMessage(Message<T> message) {
subscriber.onMessage(message.getMessageObject());