IDEMPIERE-175 Atmosphere server push. Fine tuning and switch to the more reliable long polling as default.

This commit is contained in:
Heng Sin Low 2012-11-14 10:18:03 +08:00
parent 0b06a75a3b
commit 158ef94bcf
2 changed files with 29 additions and 38 deletions

View File

@ -34,7 +34,6 @@ import org.zkoss.zk.ui.UiException;
import org.zkoss.zk.ui.event.Event; import org.zkoss.zk.ui.event.Event;
import org.zkoss.zk.ui.event.EventListener; import org.zkoss.zk.ui.event.EventListener;
import org.zkoss.zk.ui.impl.ExecutionCarryOver; import org.zkoss.zk.ui.impl.ExecutionCarryOver;
import org.zkoss.zk.ui.sys.DesktopCtrl;
import org.zkoss.zk.ui.sys.Scheduler; import org.zkoss.zk.ui.sys.Scheduler;
import org.zkoss.zk.ui.sys.ServerPush; import org.zkoss.zk.ui.sys.ServerPush;
import org.zkoss.zk.ui.util.Clients; import org.zkoss.zk.ui.util.Clients;
@ -126,12 +125,12 @@ public class AtmosphereServerPush implements ServerPush {
private synchronized void onPush() throws IOException { private synchronized void onPush() throws IOException {
AtmosphereResource resource = this.resource.get(); AtmosphereResource resource = this.resource.get();
if (resource != null) { if (resource != null) {
switch (resource.transport()) { switch (resource.transport()) {
case JSONP: case POLLING:
case LONG_POLLING: case LONG_POLLING:
if (resource.isSuspended()) if (resource.isSuspended())
commitResponse(); commitResponse();
break; break;
case WEBSOCKET : case WEBSOCKET :
case STREAMING: case STREAMING:
@ -178,15 +177,12 @@ public class AtmosphereServerPush implements ServerPush {
@Override @Override
public synchronized <T extends Event> void schedule(EventListener<T> task, T event, public synchronized <T extends Event> void schedule(EventListener<T> task, T event,
Scheduler<T> scheduler) { Scheduler<T> scheduler) {
boolean pendingPush = ((DesktopCtrl)desktop.get()).scheduledServerPush();
scheduler.schedule(task, event); scheduler.schedule(task, event);
if (!pendingPush || (this.resource.get() != null && this.resource.get().isSuspended())) { try {
try { onPush();
onPush(); } catch (IOException e) {
} catch (IOException e) { log.error(e.getLocalizedMessage(), e);
log.warn(e.getLocalizedMessage(), e); }
}
}
} }
@Override @Override
@ -222,25 +218,19 @@ public class AtmosphereServerPush implements ServerPush {
public synchronized void onRequest(AtmosphereResource resource) { public synchronized void onRequest(AtmosphereResource resource) {
if (this.resource.get() != null) { if (this.resource.get() != null) {
AtmosphereResource aResource = this.resource.get(); AtmosphereResource aResource = this.resource.get();
if (aResource.isSuspended()) { if (aResource != resource) {
try { try {
commitResponse(); onPush();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); log.error(e.getLocalizedMessage(), e);
} }
} }
} }
Desktop desktop = this.desktop.get();
if (desktop != null && desktop instanceof DesktopCtrl)
{
if (((DesktopCtrl)desktop).scheduledServerPush())
{
return;
}
}
this.resource.set(resource); this.resource.set(resource);
if (log.isTraceEnabled()) {
log.trace(resource.transport().name());
}
if (!resource.isSuspended()) { if (!resource.isSuspended()) {
resource.suspend(-1, true); resource.suspend(-1, true);
} }

View File

@ -16,9 +16,8 @@
desktop: null, desktop: null,
active: false, active: false,
timeout: 300000, timeout: 300000,
delay: 1000, delay: 100,
failures: 0, failures: 0,
count: 0,
$init: function(desktop, timeout) { $init: function(desktop, timeout) {
this.desktop = desktop; this.desktop = desktop;
@ -37,40 +36,42 @@
return; return;
var me = this; var me = this;
var socket = $.atmosphere; var socket = $.atmosphere;
var request = { var request = {
url: zk.ajaxURI("/comet", { url: zk.ajaxURI("/comet", {
au: true au: true
}), }),
logLevel : 'debug', logLevel : 'debug',
transport : 'streaming', transport : 'long-polling',
fallbackTransport: 'long-polling', fallbackTransport: 'streaming',
method: "GET", method: "GET",
cache: false, cache: false,
async: true, async: true,
timeout: me.timeout, timeout: me.timeout,
onError: function(response) { onError: function(response) {
if (typeof console == "object") {
console.error(response);
}
me.failures += 1; me.failures += 1;
me.count--; if (me.failures < 10) {
if (response.transport == 'long-polling' && me.count == 0) { if (response.transport == 'long-polling') {
me._schedule(); me._schedule();
} else if (me.failures >= 10) { }
} else {
me.stop(); me.stop();
} }
}, },
onMessage: function(response) { onMessage: function(response) {
zAu.cmd0.echo(me.desktop); zAu.cmd0.echo(me.desktop);
me.failures = 0; me.failures = 0;
me.count--; if (response.transport == 'long-polling') {
if (response.transport == 'long-polling' && me.count == 0) { me._schedule();
me._schedule(); }
}
} }
}; };
request.url = request.url+'?dtid='+me.desktop.id; request.url = request.url+'?dtid='+me.desktop.id;
this.count++; socket.unsubscribe();
socket.subscribe(request); socket.subscribe(request);
}, },
start: function() { start: function() {