diff --git a/org.adempiere.base/src/org/idempiere/broadcast/BroadCastMsg.java b/org.adempiere.base/src/org/idempiere/broadcast/BroadCastMsg.java index c37b35598b..073c9a6d08 100644 --- a/org.adempiere.base/src/org/idempiere/broadcast/BroadCastMsg.java +++ b/org.adempiere.base/src/org/idempiere/broadcast/BroadCastMsg.java @@ -32,16 +32,6 @@ public class BroadCastMsg implements Serializable { private String target; private int eventId; - private boolean fromCluster = false; - - public boolean isFromCluster() { - return fromCluster; - } - - public void setFromCluster(boolean fromCluster) { - this.fromCluster = fromCluster; - } - public int getEventId() { return eventId; } diff --git a/org.adempiere.base/src/org/idempiere/broadcast/BroadCastUtil.java b/org.adempiere.base/src/org/idempiere/broadcast/BroadCastUtil.java index fe71bbb52c..933074a2ed 100644 --- a/org.adempiere.base/src/org/idempiere/broadcast/BroadCastUtil.java +++ b/org.adempiere.base/src/org/idempiere/broadcast/BroadCastUtil.java @@ -49,13 +49,15 @@ public class BroadCastUtil { } } - public static void publish(BroadCastMsg msg){ + public static boolean publish(BroadCastMsg msg){ IServiceHolder holder = Service.locator().locate(IMessageService.class); IMessageService service = holder.getService(); if (service != null) { ITopic topic= service.getTopic(TOPIC_BROADCAST_MESSAGE); topic.publish(msg); + return true; } + return false; } } diff --git a/org.adempiere.base/src/org/idempiere/broadcast/BroadcastMsgUtil.java b/org.adempiere.base/src/org/idempiere/broadcast/BroadcastMsgUtil.java index 9b2f582a28..3a08aed62d 100644 --- a/org.adempiere.base/src/org/idempiere/broadcast/BroadcastMsgUtil.java +++ b/org.adempiere.base/src/org/idempiere/broadcast/BroadcastMsgUtil.java @@ -17,16 +17,20 @@ package org.idempiere.broadcast; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.adempiere.base.event.EventManager; import org.adempiere.base.event.IEventTopics; import org.adempiere.exceptions.DBException; import org.adempiere.model.MBroadcastMessage; +import org.compiere.Adempiere; import org.compiere.model.MNote; import org.compiere.util.CLogger; import org.compiere.util.DB; import org.compiere.util.Env; +import org.compiere.util.WebUtil; /** * @@ -48,6 +52,7 @@ public class BroadcastMsgUtil * @param trxName */ public static void publishBroadcastMessage(final int messageID, String trxName) { + MBroadcastMessage mbMessage = MBroadcastMessage.get(Env.getCtx(), messageID); String broadcastType = mbMessage.getBroadcastType(); @@ -89,26 +94,37 @@ public class BroadcastMsgUtil BroadCastMsg msg = new BroadCastMsg(); msg.setIntData(messageID); msg.setEventId(BroadCastUtil.EVENT_BROADCAST_MESSAGE); - pushToQueue(msg); + + pushToQueue(msg,false); } } - public static void pushToQueue(final BroadCastMsg msg){ - Runnable runnable = new Runnable() { - - @Override - public void run() { + public static void pushToQueue(final BroadCastMsg msg, boolean isLocalOnly) { - org.osgi.service.event.Event event = EventManager.newEvent(IEventTopics.BROADCAST_MESSAGE, msg); - EventManager.getInstance().postEvent(event); - - } - }; - - Thread thread = new Thread(runnable); - thread.setName("PublishMessage -" + Env.getContextAsInt(Env.getCtx(), "AD_Session_ID")); - thread.start(); + boolean isPublished = false; + if (!isLocalOnly) { + msg.setSrc(WebUtil.getServerName()); + isPublished = BroadCastUtil.publish(msg); + + } + + if (!isPublished) { + Runnable runnable = new Runnable() { + + @Override + public void run() { + + org.osgi.service.event.Event event = EventManager.newEvent( + IEventTopics.BROADCAST_MESSAGE, msg); + EventManager.getInstance().postEvent(event); + } + }; + + ScheduledThreadPoolExecutor executer = Adempiere + .getThreadPoolExecutor(); + executer.schedule(runnable, 0, TimeUnit.MILLISECONDS); + } } /** * Test message @@ -118,11 +134,10 @@ public class BroadcastMsgUtil public static void testBroadcastMessage(int messageID, int AD_Session_ID) { BroadCastMsg msg = new BroadCastMsg(); msg.setIntData(messageID); - msg.setFromCluster(true); msg.setEventId(BroadCastUtil.EVENT_TEST_BROADCAST_MESSAGE); msg.setTarget(Integer.toString(AD_Session_ID)); - pushToQueue(msg); + pushToQueue(msg,true); } diff --git a/org.adempiere.server-feature/server.product.launch b/org.adempiere.server-feature/server.product.launch index dbe6f298e3..26de9579b0 100644 --- a/org.adempiere.server-feature/server.product.launch +++ b/org.adempiere.server-feature/server.product.launch @@ -1,33 +1,33 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DefaultDesktop.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DefaultDesktop.java index 909b64811a..c0a4ff773e 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DefaultDesktop.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/desktop/DefaultDesktop.java @@ -325,8 +325,7 @@ public class DefaultDesktop extends TabbedDesktop implements MenuListener, Seria * @param eventManager */ public void bindEventManager() { - String topics [] = {IEventTopics.BROADCAST_MESSAGE}; - EventManager.getInstance().register(topics, this); + EventManager.getInstance().register(IEventTopics.BROADCAST_MESSAGE, this); } /** diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/event/ZKBroadCastManager.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/event/ZKBroadCastManager.java index 66484c709f..1a5b96d135 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/event/ZKBroadCastManager.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/event/ZKBroadCastManager.java @@ -14,20 +14,17 @@ package org.adempiere.webui.event; -import org.adempiere.base.event.EventManager; -import org.adempiere.base.event.IEventTopics; import org.compiere.util.WebUtil; import org.idempiere.broadcast.BroadCastMsg; import org.idempiere.broadcast.BroadCastUtil; import org.idempiere.broadcast.BroadcastMsgUtil; import org.idempiere.distributed.ITopicSubscriber; -import org.osgi.service.event.EventHandler; /** * Class Manages Broadcast Messages across webui cluster * @author Deepak Pansheriya * */ -public class ZKBroadCastManager implements ITopicSubscriber,EventHandler{ +public class ZKBroadCastManager implements ITopicSubscriber{ private static ZKBroadCastManager broadCastMgr = null; @@ -42,7 +39,6 @@ public class ZKBroadCastManager implements ITopicSubscriber,EventH } private ZKBroadCastManager(){ - EventManager.getInstance().register(IEventTopics.BROADCAST_MESSAGE, this); BroadCastUtil.subscribe(this); } @@ -52,41 +48,19 @@ public class ZKBroadCastManager implements ITopicSubscriber,EventH */ @Override public void onMessage(BroadCastMsg message) { - - if(!WebUtil.getServerName().equalsIgnoreCase(message.getSrc())){ - switch(message.getEventId()){ - case BroadCastUtil.EVENT_BROADCAST_MESSAGE: - message.setFromCluster(true); - BroadcastMsgUtil.pushToQueue(message); - break; - case BroadCastUtil.EVENT_SESSION_TIMEOUT: - message.setFromCluster(true); - BroadcastMsgUtil.pushToQueue(message); - break; - case BroadCastUtil.EVENT_SESSION_ONNODE_TIMEOUT: - if(WebUtil.getServerName().equalsIgnoreCase(message.getTarget())){ - message.setFromCluster(true); - BroadcastMsgUtil.pushToQueue(message); - } - break; - } - } - } - /** - * OSGI Event Handler - */ - @Override - public void handleEvent(org.osgi.service.event.Event event) { - BroadCastMsg msg = (BroadCastMsg) event - .getProperty(EventManager.EVENT_DATA); - // Avoid loop - if (msg.isFromCluster()) - return; - if (msg.getEventId() == BroadCastUtil.EVENT_BROADCAST_MESSAGE) { - msg.setSrc(WebUtil.getServerName()); - msg.setEventId(BroadCastUtil.EVENT_BROADCAST_MESSAGE); - BroadCastUtil.publish(msg); + switch (message.getEventId()) { + case BroadCastUtil.EVENT_BROADCAST_MESSAGE: + BroadcastMsgUtil.pushToQueue(message, true); + break; + case BroadCastUtil.EVENT_SESSION_TIMEOUT: + BroadcastMsgUtil.pushToQueue(message, true); + break; + case BroadCastUtil.EVENT_SESSION_ONNODE_TIMEOUT: + if (WebUtil.getServerName().equalsIgnoreCase(message.getTarget())) { + BroadcastMsgUtil.pushToQueue(message, true); + } + break; } } } diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/panel/TimeoutPanel.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/panel/TimeoutPanel.java index ca3ccdfc57..03f5861b42 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/panel/TimeoutPanel.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/panel/TimeoutPanel.java @@ -14,6 +14,7 @@ package org.adempiere.webui.panel; +import org.adempiere.webui.session.SessionManager; import org.zkoss.zhtml.Script; import org.zkoss.zk.ui.Executions; import org.zkoss.zk.ui.Session; @@ -140,8 +141,7 @@ public class TimeoutPanel extends Window implements if (event.getName().equals("onTimer")) { timer.stop(); - Session session = Sessions.getCurrent(); - session.invalidate(); + SessionManager.logoutSession(); Executions.sendRedirect("/index.zul"); } } diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillAllSession.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillAllSession.java index 7ab352ad03..bfadfc6f41 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillAllSession.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillAllSession.java @@ -65,7 +65,7 @@ public class KillAllSession extends SvrProcess { msg.setEventId(BroadCastUtil.EVENT_SESSION_ONNODE_TIMEOUT); msg.setIntData(scndTimeout); msg.setTarget(rs.getString("servername")); - BroadcastMsgUtil.pushToQueue(msg); + BroadcastMsgUtil.pushToQueue(msg,false); } } catch (Exception e) { logger.log(Level.SEVERE, "servername could not be retrieved", e); diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillCurrentSession.java b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillCurrentSession.java index efac641b50..cd3924ba23 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillCurrentSession.java +++ b/org.adempiere.ui.zk/WEB-INF/src/org/adempiere/webui/process/KillCurrentSession.java @@ -48,7 +48,7 @@ public class KillCurrentSession extends SvrProcess { msg.setEventId(BroadCastUtil.EVENT_SESSION_TIMEOUT); msg.setIntData(scndTimeout); msg.setTarget(Integer.toString(getRecord_ID())); - BroadcastMsgUtil.pushToQueue(msg); + BroadcastMsgUtil.pushToQueue(msg,false); return "Session notified"; }