IDEMPIERE-293 Support for Kill Session. Use dynamic service for access to IMessageService to avoid startup timing issue.

This commit is contained in:
Heng Sin Low 2013-02-14 21:07:31 +08:00
parent ac0d909eac
commit 8c7ba69090
4 changed files with 58 additions and 24 deletions

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="org.adempiere.base.broadcast.util">
<implementation class="org.idempiere.broadcast.BroadCastUtil"/>
<reference bind="bindMessageService" cardinality="1..1" interface="org.idempiere.distributed.IMessageService" name="IMessageService" policy="dynamic" unbind="unbindMessageService"/>
</scr:component>

View File

@ -26,7 +26,8 @@ bin.includes = META-INF/,\
OSGI-INF/defaultmodelvalidatorfactory.xml,\ OSGI-INF/defaultmodelvalidatorfactory.xml,\
OSGI-INF/defaultprocessfactory.xml,\ OSGI-INF/defaultprocessfactory.xml,\
OSGI-INF/defaultshipmentprocessorfactory.xml,\ OSGI-INF/defaultshipmentprocessorfactory.xml,\
OSGI-INF/defaultpaymentprocessorfactory.xml OSGI-INF/defaultpaymentprocessorfactory.xml,\
OSGI-INF/broadcastutil.xml
output.base.jar = build/ output.base.jar = build/
src.includes = schema/ src.includes = schema/
source.base.jar = src/ source.base.jar = src/

View File

@ -13,8 +13,9 @@
*****************************************************************************/ *****************************************************************************/
package org.idempiere.broadcast; package org.idempiere.broadcast;
import org.adempiere.base.IServiceHolder; import java.util.ArrayList;
import org.adempiere.base.Service; import java.util.List;
import org.idempiere.distributed.IMessageService; import org.idempiere.distributed.IMessageService;
import org.idempiere.distributed.ITopic; import org.idempiere.distributed.ITopic;
import org.idempiere.distributed.ITopicSubscriber; import org.idempiere.distributed.ITopicSubscriber;
@ -29,30 +30,39 @@ public class BroadCastUtil {
public static final int EVENT_SESSION_TIMEOUT =3; public static final int EVENT_SESSION_TIMEOUT =3;
public static final int EVENT_SESSION_ONNODE_TIMEOUT=4; public static final int EVENT_SESSION_ONNODE_TIMEOUT=4;
public static void subscribe(ITopicSubscriber<BroadCastMsg> subscriber){ private final static List<ITopicSubscriber<BroadCastMsg>> subscribers = new ArrayList<ITopicSubscriber<BroadCastMsg>>();
private static IMessageService service = null;
IServiceHolder<IMessageService> holder = Service.locator().locate(IMessageService.class);
IMessageService service = holder.getService(); /**
*
* @param subscriber
*/
public static synchronized void subscribe(ITopicSubscriber<BroadCastMsg> subscriber){
subscribers.add(subscriber);
if (service != null) { if (service != null) {
ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE); ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE);
topic.subscribe(subscriber); topic.subscribe(subscriber);
} }
} }
public static void unSubscribe(ITopicSubscriber<BroadCastMsg> subscriber){ /**
*
IServiceHolder<IMessageService> holder = Service.locator().locate(IMessageService.class); * @param subscriber
IMessageService service = holder.getService(); */
public static synchronized void unSubscribe(ITopicSubscriber<BroadCastMsg> subscriber){
subscribers.remove(subscriber);
if (service != null) { if (service != null) {
ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE); ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE);
topic.unsubscribe(subscriber); topic.unsubscribe(subscriber);
} }
} }
public static boolean publish(BroadCastMsg msg){ /**
*
IServiceHolder<IMessageService> holder = Service.locator().locate(IMessageService.class); * @param msg
IMessageService service = holder.getService(); * @return true if publish successfully
*/
public static synchronized boolean publish(BroadCastMsg msg){
if (service != null) { if (service != null) {
ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE); ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE);
topic.publish(msg); topic.publish(msg);
@ -60,4 +70,28 @@ public class BroadCastUtil {
} }
return false; return false;
} }
/**
*
* @param messageService
*/
public void bindMessageService(IMessageService messageService) {
synchronized (BroadCastUtil.class) {
service = messageService;
for (ITopicSubscriber<BroadCastMsg> subscriber : subscribers) {
ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE);
topic.subscribe(subscriber);
}
}
}
/**
*
* @param messageService
*/
public void unbindMessageService(IMessageService messageService) {
synchronized (BroadCastUtil.class) {
service = null;
}
}
} }

View File

@ -26,15 +26,9 @@ import org.idempiere.distributed.ITopicSubscriber;
*/ */
public class ZKBroadCastManager implements ITopicSubscriber<BroadCastMsg>{ public class ZKBroadCastManager implements ITopicSubscriber<BroadCastMsg>{
private static ZKBroadCastManager broadCastMgr = null; private final static ZKBroadCastManager broadCastMgr = new ZKBroadCastManager();
public static ZKBroadCastManager getBroadCastMgr() { public static ZKBroadCastManager getBroadCastMgr() {
synchronized (ZKBroadCastManager.class) {
if(broadCastMgr==null)
broadCastMgr= new ZKBroadCastManager();
}
return broadCastMgr; return broadCastMgr;
} }