Merge a1bbc19ffefa

This commit is contained in:
Heng Sin Low 2012-12-04 22:05:58 +08:00
commit 7250ef3117
9 changed files with 86 additions and 106 deletions

View File

@ -32,16 +32,6 @@ public class BroadCastMsg implements Serializable {
private String target;
private int eventId;
private boolean fromCluster = false;
public boolean isFromCluster() {
return fromCluster;
}
public void setFromCluster(boolean fromCluster) {
this.fromCluster = fromCluster;
}
public int getEventId() {
return eventId;
}

View File

@ -49,13 +49,15 @@ public class BroadCastUtil {
}
}
public static void publish(BroadCastMsg msg){
public static boolean publish(BroadCastMsg msg){
IServiceHolder<IMessageService> holder = Service.locator().locate(IMessageService.class);
IMessageService service = holder.getService();
if (service != null) {
ITopic<BroadCastMsg> topic= service.getTopic(TOPIC_BROADCAST_MESSAGE);
topic.publish(msg);
return true;
}
return false;
}
}

View File

@ -17,16 +17,20 @@ package org.idempiere.broadcast;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.adempiere.base.event.EventManager;
import org.adempiere.base.event.IEventTopics;
import org.adempiere.exceptions.DBException;
import org.adempiere.model.MBroadcastMessage;
import org.compiere.Adempiere;
import org.compiere.model.MNote;
import org.compiere.util.CLogger;
import org.compiere.util.DB;
import org.compiere.util.Env;
import org.compiere.util.WebUtil;
/**
*
@ -48,6 +52,7 @@ public class BroadcastMsgUtil
* @param trxName
*/
public static void publishBroadcastMessage(final int messageID, String trxName) {
MBroadcastMessage mbMessage = MBroadcastMessage.get(Env.getCtx(), messageID);
String broadcastType = mbMessage.getBroadcastType();
@ -89,26 +94,37 @@ public class BroadcastMsgUtil
BroadCastMsg msg = new BroadCastMsg();
msg.setIntData(messageID);
msg.setEventId(BroadCastUtil.EVENT_BROADCAST_MESSAGE);
pushToQueue(msg);
pushToQueue(msg,false);
}
}
public static void pushToQueue(final BroadCastMsg msg){
Runnable runnable = new Runnable() {
@Override
public void run() {
public static void pushToQueue(final BroadCastMsg msg, boolean isLocalOnly) {
org.osgi.service.event.Event event = EventManager.newEvent(IEventTopics.BROADCAST_MESSAGE, msg);
EventManager.getInstance().postEvent(event);
}
};
Thread thread = new Thread(runnable);
thread.setName("PublishMessage -" + Env.getContextAsInt(Env.getCtx(), "AD_Session_ID"));
thread.start();
boolean isPublished = false;
if (!isLocalOnly) {
msg.setSrc(WebUtil.getServerName());
isPublished = BroadCastUtil.publish(msg);
}
if (!isPublished) {
Runnable runnable = new Runnable() {
@Override
public void run() {
org.osgi.service.event.Event event = EventManager.newEvent(
IEventTopics.BROADCAST_MESSAGE, msg);
EventManager.getInstance().postEvent(event);
}
};
ScheduledThreadPoolExecutor executer = Adempiere
.getThreadPoolExecutor();
executer.schedule(runnable, 0, TimeUnit.MILLISECONDS);
}
}
/**
* Test message
@ -118,11 +134,10 @@ public class BroadcastMsgUtil
public static void testBroadcastMessage(int messageID, int AD_Session_ID) {
BroadCastMsg msg = new BroadCastMsg();
msg.setIntData(messageID);
msg.setFromCluster(true);
msg.setEventId(BroadCastUtil.EVENT_TEST_BROADCAST_MESSAGE);
msg.setTarget(Integer.toString(AD_Session_ID));
pushToQueue(msg);
pushToQueue(msg,true);
}

File diff suppressed because one or more lines are too long

View File

@ -325,8 +325,7 @@ public class DefaultDesktop extends TabbedDesktop implements MenuListener, Seria
* @param eventManager
*/
public void bindEventManager() {
String topics [] = {IEventTopics.BROADCAST_MESSAGE};
EventManager.getInstance().register(topics, this);
EventManager.getInstance().register(IEventTopics.BROADCAST_MESSAGE, this);
}
/**

View File

@ -14,20 +14,17 @@
package org.adempiere.webui.event;
import org.adempiere.base.event.EventManager;
import org.adempiere.base.event.IEventTopics;
import org.compiere.util.WebUtil;
import org.idempiere.broadcast.BroadCastMsg;
import org.idempiere.broadcast.BroadCastUtil;
import org.idempiere.broadcast.BroadcastMsgUtil;
import org.idempiere.distributed.ITopicSubscriber;
import org.osgi.service.event.EventHandler;
/**
* Class Manages Broadcast Messages across webui cluster
* @author Deepak Pansheriya
*
*/
public class ZKBroadCastManager implements ITopicSubscriber<BroadCastMsg>,EventHandler{
public class ZKBroadCastManager implements ITopicSubscriber<BroadCastMsg>{
private static ZKBroadCastManager broadCastMgr = null;
@ -42,7 +39,6 @@ public class ZKBroadCastManager implements ITopicSubscriber<BroadCastMsg>,EventH
}
private ZKBroadCastManager(){
EventManager.getInstance().register(IEventTopics.BROADCAST_MESSAGE, this);
BroadCastUtil.subscribe(this);
}
@ -52,41 +48,19 @@ public class ZKBroadCastManager implements ITopicSubscriber<BroadCastMsg>,EventH
*/
@Override
public void onMessage(BroadCastMsg message) {
if(!WebUtil.getServerName().equalsIgnoreCase(message.getSrc())){
switch(message.getEventId()){
case BroadCastUtil.EVENT_BROADCAST_MESSAGE:
message.setFromCluster(true);
BroadcastMsgUtil.pushToQueue(message);
break;
case BroadCastUtil.EVENT_SESSION_TIMEOUT:
message.setFromCluster(true);
BroadcastMsgUtil.pushToQueue(message);
break;
case BroadCastUtil.EVENT_SESSION_ONNODE_TIMEOUT:
if(WebUtil.getServerName().equalsIgnoreCase(message.getTarget())){
message.setFromCluster(true);
BroadcastMsgUtil.pushToQueue(message);
}
break;
}
}
}
/**
* OSGI Event Handler
*/
@Override
public void handleEvent(org.osgi.service.event.Event event) {
BroadCastMsg msg = (BroadCastMsg) event
.getProperty(EventManager.EVENT_DATA);
// Avoid loop
if (msg.isFromCluster())
return;
if (msg.getEventId() == BroadCastUtil.EVENT_BROADCAST_MESSAGE) {
msg.setSrc(WebUtil.getServerName());
msg.setEventId(BroadCastUtil.EVENT_BROADCAST_MESSAGE);
BroadCastUtil.publish(msg);
switch (message.getEventId()) {
case BroadCastUtil.EVENT_BROADCAST_MESSAGE:
BroadcastMsgUtil.pushToQueue(message, true);
break;
case BroadCastUtil.EVENT_SESSION_TIMEOUT:
BroadcastMsgUtil.pushToQueue(message, true);
break;
case BroadCastUtil.EVENT_SESSION_ONNODE_TIMEOUT:
if (WebUtil.getServerName().equalsIgnoreCase(message.getTarget())) {
BroadcastMsgUtil.pushToQueue(message, true);
}
break;
}
}
}

View File

@ -14,6 +14,7 @@
package org.adempiere.webui.panel;
import org.adempiere.webui.session.SessionManager;
import org.zkoss.zhtml.Script;
import org.zkoss.zk.ui.Executions;
import org.zkoss.zk.ui.Session;
@ -140,8 +141,7 @@ public class TimeoutPanel extends Window implements
if (event.getName().equals("onTimer"))
{
timer.stop();
Session session = Sessions.getCurrent();
session.invalidate();
SessionManager.logoutSession();
Executions.sendRedirect("/index.zul");
}
}

View File

@ -65,7 +65,7 @@ public class KillAllSession extends SvrProcess {
msg.setEventId(BroadCastUtil.EVENT_SESSION_ONNODE_TIMEOUT);
msg.setIntData(scndTimeout);
msg.setTarget(rs.getString("servername"));
BroadcastMsgUtil.pushToQueue(msg);
BroadcastMsgUtil.pushToQueue(msg,false);
}
} catch (Exception e) {
logger.log(Level.SEVERE, "servername could not be retrieved", e);

View File

@ -48,7 +48,7 @@ public class KillCurrentSession extends SvrProcess {
msg.setEventId(BroadCastUtil.EVENT_SESSION_TIMEOUT);
msg.setIntData(scndTimeout);
msg.setTarget(Integer.toString(getRecord_ID()));
BroadcastMsgUtil.pushToQueue(msg);
BroadcastMsgUtil.pushToQueue(msg,false);
return "Session notified";
}