diff --git a/org.adempiere.ui.zk/META-INF/MANIFEST.MF b/org.adempiere.ui.zk/META-INF/MANIFEST.MF index b6a27d5fe0..fed8dc5dde 100644 --- a/org.adempiere.ui.zk/META-INF/MANIFEST.MF +++ b/org.adempiere.ui.zk/META-INF/MANIFEST.MF @@ -14,6 +14,8 @@ Import-Package: groovy.transform.stc;version="2.4.7", javax.servlet.http, javax.servlet.jsp.resources;version="2.3.0", javax.servlet.resources;version="3.1.0", + javax.websocket;version="1.1.0", + javax.websocket.server;version="1.1.0", javax.xml.bind;version="2.3.0", net.sf.jasperreports.engine, net.sf.jasperreports.engine.export, @@ -32,6 +34,8 @@ Import-Package: groovy.transform.stc;version="2.4.7", org.apache.tools.ant.taskdefs, org.compiere.css, org.eclipse.core.runtime;version="3.4.0", + org.eclipse.jetty.websocket.jsr356;version="9.4.12", + org.eclipse.jetty.websocket.jsr356.server;version="9.4.12", org.jfree.chart, org.jfree.chart.encoders, org.jfree.chart.entity, diff --git a/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml b/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml index d5e98d7741..7a46ecca0d 100644 --- a/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml +++ b/org.adempiere.ui.zk/WEB-INF/src/metainfo/zk/lang-addon.xml @@ -45,5 +45,6 @@ Copyright (C) 2007 Ashley G Ramdass (ADempiere WebUI). + diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/idempiere/ui/zk/websocket/ServerPushEndPoint.java b/org.adempiere.ui.zk/WEB-INF/src/org/idempiere/ui/zk/websocket/ServerPushEndPoint.java new file mode 100644 index 0000000000..433aff462a --- /dev/null +++ b/org.adempiere.ui.zk/WEB-INF/src/org/idempiere/ui/zk/websocket/ServerPushEndPoint.java @@ -0,0 +1,100 @@ +/*********************************************************************** + * This file is part of iDempiere ERP Open Source * + * http://www.idempiere.org * + * * + * Copyright (C) Contributors * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software * + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, * + * MA 02110-1301, USA. * + * * + * Contributors: * + * - hengsin * + **********************************************************************/ +package org.idempiere.ui.zk.websocket; + +import java.io.IOException; +import java.util.logging.Level; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import org.compiere.util.CLogger; +import org.compiere.util.Util; + +/** + * @author hengsin + * + */ +@ServerEndpoint(value="/serverpush/{dtid}") +public class ServerPushEndPoint { + + private Session session; + private String dtid; + + /** + * + */ + public ServerPushEndPoint() { + } + + + @OnClose + public void onClose(Session sess) throws IOException { + if (this.session != null) { + this.session = null; + WebSocketServerPush.unregisterEndPoint(dtid); + } + } + + + @OnOpen + public void onOpen(Session sess, @PathParam("dtid") String dtid) throws IOException { + if (!Util.isEmpty(dtid, true)) { + session = sess; + session.setMaxIdleTimeout(30000); + this.dtid = dtid; + WebSocketServerPush.registerEndPoint(dtid, this); + } + } + + + @OnError + public void onError(Session sess, Throwable throwable) { + CLogger.getCLogger(getClass()).log(Level.WARNING, throwable.getMessage(), throwable); + } + + + @OnMessage + public void onMessage(Session session, String message) { + } + + /** + * Message client to send echo event to server + */ + public void echo() { + if (session != null) { + try { + session.getBasicRemote().sendText("echo"); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/org.adempiere.ui.zk/WEB-INF/src/org/idempiere/ui/zk/websocket/WebSocketServerPush.java b/org.adempiere.ui.zk/WEB-INF/src/org/idempiere/ui/zk/websocket/WebSocketServerPush.java new file mode 100644 index 0000000000..61b52fe9e4 --- /dev/null +++ b/org.adempiere.ui.zk/WEB-INF/src/org/idempiere/ui/zk/websocket/WebSocketServerPush.java @@ -0,0 +1,297 @@ +/*********************************************************************** + * This file is part of iDempiere ERP Open Source * + * http://www.idempiere.org * + * * + * Copyright (C) Contributors * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software * + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, * + * MA 02110-1301, USA. * + * * + * Contributors: * + * - hengsin * + **********************************************************************/ +package org.idempiere.ui.zk.websocket; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zkoss.zk.au.out.AuScript; +import org.zkoss.zk.ui.Desktop; +import org.zkoss.zk.ui.DesktopUnavailableException; +import org.zkoss.zk.ui.Executions; +import org.zkoss.zk.ui.UiException; +import org.zkoss.zk.ui.event.Event; +import org.zkoss.zk.ui.event.EventListener; +import org.zkoss.zk.ui.impl.ExecutionCarryOver; +import org.zkoss.zk.ui.sys.Scheduler; +import org.zkoss.zk.ui.sys.ServerPush; +import org.zkoss.zk.ui.util.Clients; + +/** + * + * @author hengsin + * + */ +public class WebSocketServerPush implements ServerPush { + + private static final String ON_ACTIVATE_DESKTOP = "onActivateDesktop"; + + private final AtomicReference desktop = new AtomicReference(); + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private ThreadInfo _active; + private ExecutionCarryOver _carryOver; + private final Object _mutex = new Object(); + + private final static Map endPointMap = new ConcurrentHashMap<>(); + private List> schedules = new ArrayList<>(); + + public WebSocketServerPush() { + } + + @Override + public boolean activate(long timeout) throws InterruptedException, DesktopUnavailableException { + final Thread curr = Thread.currentThread(); + if (_active != null && _active.thread.equals(curr)) { //re-activate + ++_active.nActive; + return true; + } + + final ThreadInfo info = new ThreadInfo(curr); + + EventListener task = new EventListener() { + @Override + public void onEvent(Event event) throws Exception { + if (event.getName().equals(ON_ACTIVATE_DESKTOP)) + { + synchronized (_mutex) { + _carryOver = new ExecutionCarryOver(desktop.get()); + + synchronized (info) { + info.nActive = 1; //granted + info.notifyAll(); + } + + try { + _mutex.wait(); //wait until the server push is done + } catch (InterruptedException ex) { + throw UiException.Aide.wrap(ex); + } + } + } + } + }; + + synchronized (info) { + Executions.schedule(desktop.get(), task, new Event(ON_ACTIVATE_DESKTOP)); + if (info.nActive == 0) + info.wait(timeout <= 0 ? 10*60*1000: timeout); + } + + _carryOver.carryOver(); + _active = info; + + return true; + } + + private boolean echo() { + Desktop dt = desktop.get(); + if (dt != null) { + ServerPushEndPoint endPoint = getEndPoint(dt.getId()); + if (endPoint == null) { + if (dt.isServerPushEnabled()) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + endPoint = getEndPoint(dt.getId()); + } + } + if (endPoint != null) { + endPoint.echo(); + return true; + } + } + return false; + } + + @Override + public boolean deactivate(boolean stop) { + boolean stopped = false; + if (_active != null && Thread.currentThread().equals(_active.thread)) { + if (--_active.nActive <= 0) { + if (stop) + { + stop(); + stopped = true; + } + + _carryOver.cleanup(); + _carryOver = null; + _active.nActive = 0; //just in case + _active = null; + + synchronized (_mutex) { + _mutex.notifyAll(); + } + } + } + return stopped; + } + + @Override + public boolean isActive() { + return _active != null && _active.nActive > 0; + } + + @SuppressWarnings("unchecked") + @Override + public void onPiggyback() { + Schedule[] pendings = null; + synchronized (schedules) { + if (!schedules.isEmpty()) { + pendings = schedules.toArray(new Schedule[0]); + schedules = new ArrayList<>(); + } + } + if (pendings != null && pendings.length > 0) { + for(Schedule p : pendings) { + p.scheduler.schedule(p.task, p.event); + } + } + + //check web socket end point + Desktop dt = desktop.get(); + if (dt != null) { + ServerPushEndPoint endPoint = getEndPoint(dt.getId()); + if (endPoint == null) { + if (dt.isServerPushEnabled()) { + startServerPushAtClient(dt); + } + } + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void schedule(EventListener task, T event, + Scheduler scheduler) { + if (Executions.getCurrent() == null) { + //save for schedule at on piggyback event + synchronized (schedules) { + schedules.add(new Schedule(task, event, scheduler)); + } + echo(); + } else { + //in event listener thread, can schedule immediately + scheduler.schedule(task, event); + } + } + + private class Schedule { + private EventListener task; + private T event; + private Scheduler scheduler; + + private Schedule(EventListener task, T event, Scheduler scheduler) { + this.task = task; + this.event = event; + this.scheduler = scheduler; + } + } + + @Override + public void start(Desktop desktop) { + Desktop oldDesktop = this.desktop.getAndSet(desktop); + if (oldDesktop != null) { + log.warn("Server push already started for desktop " + desktop.getId()); + return; + } + + log.debug("Starting server push for " + desktop); + startServerPushAtClient(desktop); + } + + private void startServerPushAtClient(Desktop desktop) { + Clients.response("org.idempiere.websocket.serverpush.start", new AuScript(null, "org.idempiere.websocket.startServerPush('" + desktop.getId() + "');")); + } + + @Override + public void stop() { + Desktop desktop = this.desktop.getAndSet(null); + if (desktop == null) { + log.warn("Server push hasn't been started or has already stopped"); + return; + } + + log.debug("Stopping server push for " + desktop); + Clients.response("org.idempiere.websocket.serverpush.stop", new AuScript(null, "org.idempiere.websocket.stopServerPush('" + desktop.getId() + "');")); + } + + private static class ThreadInfo { + private final Thread thread; + /** # of activate() was called. */ + private int nActive; + private ThreadInfo(Thread thread) { + this.thread = thread; + } + public String toString() { + return "[" + thread + ',' + nActive + ']'; + } + } + + @Override + public void resume() { + if (this.desktop.get() != null) { + Desktop desktop = this.desktop.getAndSet(null); + start(desktop); + } + } + + /** + * Register web socket end point for desktop + * @param dtid Desktop id + * @param endpoint Connected web socket end point + */ + public static void registerEndPoint(String dtid, ServerPushEndPoint endpoint) { + endPointMap.put(dtid, endpoint); + } + + /** + * Remove web socket end point for desktop + * @param dtid Desktop id + * @return true if there's end point register previously for desktop, false otherwise + */ + public static boolean unregisterEndPoint(String dtid) { + ServerPushEndPoint endpoint = endPointMap.remove(dtid); + return endpoint != null; + } + + /** + * Get web socket end point for desktop + * @param dtid Desktop id + * @return Web socket end point + */ + public static ServerPushEndPoint getEndPoint(String dtid) { + ServerPushEndPoint endpoint = endPointMap.get(dtid); + return endpoint; + } +} diff --git a/org.adempiere.ui.zk/WEB-INF/src/web/js/org/idempiere/websocket/serverpush.js b/org.adempiere.ui.zk/WEB-INF/src/web/js/org/idempiere/websocket/serverpush.js new file mode 100644 index 0000000000..f8cab97c5b --- /dev/null +++ b/org.adempiere.ui.zk/WEB-INF/src/web/js/org/idempiere/websocket/serverpush.js @@ -0,0 +1,62 @@ +(function() { + org.idempiere.websocket.startServerPush = function(dtid) { + var dt = zk.Desktop.$(dtid); + if (dt._serverpush) + dt._serverpush.stop(); + + var spush = new org.idempiere.websocket.ServerPush(dt); + spush.start(); + }; + org.idempiere.websocket.stopServerPush = function(dtid) { + var dt = zk.Desktop.$(dtid); + if (dt._serverpush) + dt._serverpush.stop(); + }; + org.idempiere.websocket.ServerPush = zk.$extends(zk.Object, { + desktop: null, + socket: null, + active: false, + delay: 10, + $init: function(desktop) { + this.desktop = desktop; + }, + start: function() { + this.desktop._serverpush = this; + var url = window.location.protocol == "http:" ? "ws://" : "wss://"; + var path = window.location.href.substring(window.location.protocol.length+2); + path = path.substring(location.host.length+1); + var last=path.lastIndexOf("/"); + if (last > 0) { + path = path.substring(0, last); + } else { + path = ""; + } + url = url + window.location.host + "/" + path + "/serverpush/" + this.desktop.id; + var me = this; + this.socket = new WebSocket(url); + this.socket.onopen = function (event) { + me.active = true; + }; + this.socket.onmessage = function (event) { + if (event.data=="echo") { + zAu.cmd0.echo(this.desktop); + } else if (event.data=="stop") { + me.stop(); + } + } + this.socket.onerror = function(event) { + console.log(event); + }; + }, + stop: function() { + this.active = false; + this.desktop._serverpush = null; + if (this.socket) { + try { + this.socket.close(); + } catch (error) {} + this.socket = null; + } + } + }); +})(); \ No newline at end of file diff --git a/org.adempiere.ui.zk/WEB-INF/src/web/js/org/idempiere/websocket/zk.wpd b/org.adempiere.ui.zk/WEB-INF/src/web/js/org/idempiere/websocket/zk.wpd new file mode 100644 index 0000000000..3fcdbfa87e --- /dev/null +++ b/org.adempiere.ui.zk/WEB-INF/src/web/js/org/idempiere/websocket/zk.wpd @@ -0,0 +1,4 @@ + + +