IDEMPIERE-3840 Use web socket for server push. Fix initial connection setup. Fix handling of timeout.

This commit is contained in:
Heng Sin Low 2019-01-03 21:38:14 +08:00
parent 16595cca41
commit 5d65cc419d
3 changed files with 60 additions and 21 deletions

View File

@ -82,6 +82,15 @@ public class ServerPushEndPoint {
@OnMessage
public void onMessage(Session session, String message) {
if (session == this.session && !Util.isEmpty(message)) {
if (message.equals("__ping__")) {
try {
session.getBasicRemote().sendText("__pong__");
} catch (IllegalArgumentException | IOException e) {
CLogger.getCLogger(getClass()).log(Level.WARNING, e.getMessage(), e);
}
}
}
}
/**

View File

@ -62,6 +62,7 @@ public class WebSocketServerPush implements ServerPush {
private final Object _mutex = new Object();
private final static Map<String, ServerPushEndPoint> endPointMap = new ConcurrentHashMap<>();
private final static Map<String, Boolean> unregisterMap = new ConcurrentHashMap<>();
private final static ServerPushEndPoint STUB = new ServerPushEndPoint();
private List<Schedule<Event>> schedules = new ArrayList<>();
@ -120,7 +121,7 @@ public class WebSocketServerPush implements ServerPush {
if (endPoint == null) {
if (dt.isServerPushEnabled()) {
try {
Thread.sleep(5000);
Thread.sleep(2000);
} catch (InterruptedException e) {
}
endPoint = getEndPoint(dt.getId());
@ -184,7 +185,8 @@ public class WebSocketServerPush implements ServerPush {
if (dt != null) {
ServerPushEndPoint endPoint = getEndPoint(dt.getId());
if (endPoint == null) {
if (dt.isServerPushEnabled()) {
if (dt.isServerPushEnabled() && unregisterMap.remove(dt.getId(), Boolean.TRUE)) {
registerEndPoint(dt.getId(), STUB);
startServerPushAtClient(dt);
}
}
@ -286,6 +288,9 @@ public class WebSocketServerPush implements ServerPush {
*/
public static boolean unregisterEndPoint(String dtid) {
ServerPushEndPoint endpoint = endPointMap.remove(dtid);
if (endpoint != null) {
unregisterMap.put(dtid, Boolean.TRUE);
}
return endpoint != null;
}

View File

@ -1,27 +1,36 @@
(function() {
org.idempiere.websocket.startServerPush = function(dtid) {
var dt = zk.Desktop.$(dtid);
if (dt._serverpush)
dt._serverpush.stop();
var spush = new org.idempiere.websocket.ServerPush(dt);
spush.start();
if (dt._serverpush && dt._serverpush.socket) {
if (dt._serverpush._reconnectId) {
return;
}
dt._serverpush.restart();
} else {
var spush = new org.idempiere.websocket.ServerPush(dtid);
spush.start();
}
};
org.idempiere.websocket.stopServerPush = function(dtid) {
var dt = zk.Desktop.$(dtid);
if (dt._serverpush)
dt._serverpush.stop();
};
org.idempiere.websocket.ServerPush = zk.$extends(zk.Object, {
desktop: null,
socket: null,
active: false,
delay: 10,
$init: function(desktop) {
this.desktop = desktop;
org.idempiere.websocket.ServerPush = zk.$extends(zk.Object, {
$init: function(dtid) {
this.dtid = dtid;
this.socket = null;
this.active = false;
this.reconnect = false;
var desktop = zk.Desktop.$(this.dtid);
desktop._serverpush = this;
},
start: function() {
this.desktop._serverpush = this;
var desktop = zk.Desktop.$(this.dtid);
if (typeof desktop === 'undefined')
return;
this.reconnect = false;
this._reconnectId = null;
var url = window.location.protocol == "http:" ? "ws://" : "wss://";
var path = window.location.href.substring(window.location.protocol.length+2);
path = path.substring(location.host.length+1);
@ -31,32 +40,48 @@
} else {
path = "/serverpush/";
}
url = url + window.location.host + path + this.desktop.id;
url = url + window.location.host + path + desktop.id;
var me = this;
this.socket = new WebSocket(url);
this.socket.onopen = function (event) {
me.active = true;
me.socket.send("__ping__");
};
this.socket.onmessage = function (event) {
if (event.data=="echo") {
zAu.cmd0.echo(this.desktop);
var desktop = zk.Desktop.$(me.dtid);
zAu.cmd0.echo(desktop);
} else if (event.data=="stop") {
me.stop();
}
}
this.socket.onclose = function (event) {
if (me.socket) {
me.socket = null;
}
if (me.reconnect) {
me._reconnectId = setTimeout(function() {me.start();}, 2000);
}
}
this.socket.onerror = function(event) {
console.log(event);
};
},
stop: function() {
this.active = false;
this.desktop._serverpush = null;
var desktop = zk.Desktop.$(this.dtid);
desktop._serverpush = null;
if (this.socket) {
try {
this.socket.close();
} catch (error) {}
this.socket = null;
this.socket.close(1000);
} catch (error) {
console.log(error);
}
}
},
restart: function() {
this.reconnect = true;
this.stop();
}
});
})();