[IDEMPIERE-175] Performance: Use atmosphere ( long pooling, NIO ) server push

This commit is contained in:
Elaine Tan 2012-04-26 18:38:22 +08:00
parent 92f5d0245c
commit 0174cceaf6
1 changed files with 87 additions and 2 deletions

View File

@ -9,8 +9,11 @@ import org.zkoss.lang.Library;
import org.zkoss.zk.au.out.AuScript;
import org.zkoss.zk.ui.Desktop;
import org.zkoss.zk.ui.DesktopUnavailableException;
import org.zkoss.zk.ui.Executions;
import org.zkoss.zk.ui.UiException;
import org.zkoss.zk.ui.event.Event;
import org.zkoss.zk.ui.event.EventListener;
import org.zkoss.zk.ui.impl.ExecutionCarryOver;
import org.zkoss.zk.ui.sys.DesktopCtrl;
import org.zkoss.zk.ui.sys.Scheduler;
import org.zkoss.zk.ui.sys.ServerPush;
@ -31,6 +34,10 @@ public class AtmosphereServerPush implements ServerPush {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final AtomicReference<AtmosphereResource> resource = new AtomicReference<AtmosphereResource>();
private final int timeout;
private ThreadInfo _active;
private ExecutionCarryOver _carryOver;
private final Object _mutex = new Object();
public AtmosphereServerPush() {
String timeoutString = Library.getProperty("fi.jawsy.jawwa.zk.atmosphere.timeout");
@ -43,7 +50,47 @@ public class AtmosphereServerPush implements ServerPush {
@Override
public boolean activate(long timeout) throws InterruptedException, DesktopUnavailableException {
throw new UnsupportedOperationException("activate is not supported by AtmosphereServerPush");
final Thread curr = Thread.currentThread();
if (_active != null && _active.thread.equals(curr)) { //re-activate
++_active.nActive;
return true;
}
final ThreadInfo info = new ThreadInfo(curr);
EventListener<Event> task = new EventListener<Event>() {
@Override
public void onEvent(Event event) throws Exception {
if (event.getName().equals("onNewData"))
{
synchronized (_mutex) {
_carryOver = new ExecutionCarryOver(desktop.get());
synchronized (info) {
info.nActive = 1; //granted
info.notify();
}
try {
_mutex.wait(); //wait until the server push is done
} catch (InterruptedException ex) {
throw UiException.Aide.wrap(ex);
}
}
}
}
};
synchronized (info) {
Executions.schedule(desktop.get(), task, new Event("onNewData"));
if (info.nActive == 0)
info.wait(timeout <= 0 ? 10*60*1000: timeout);
}
_carryOver.carryOver();
_active = info;
return true;
}
public void clearResource(AtmosphereResource resource) {
@ -59,7 +106,26 @@ public class AtmosphereServerPush implements ServerPush {
@Override
public boolean deactivate(boolean stop) {
throw new UnsupportedOperationException("deactivate is not supported by AtmosphereServerPush");
boolean stopped = false;
if (_active != null && Thread.currentThread().equals(_active.thread)) {
if (--_active.nActive <= 0) {
if (stop)
{
stop();
stopped = true;
}
_carryOver.cleanup();
_carryOver = null;
_active.nActive = 0; //just in case
_active = null;
synchronized (_mutex) {
_mutex.notify();
}
}
}
return stopped;
}
@Override
@ -70,6 +136,13 @@ public class AtmosphereServerPush implements ServerPush {
@Override
public void onPiggyback() {
Desktop desktop = this.desktop.get();
if (desktop == null) {
return;
}
if (Executions.getCurrent() != null && _carryOver == null)
_carryOver = new ExecutionCarryOver(desktop);
}
@Override
@ -127,4 +200,16 @@ public class AtmosphereServerPush implements ServerPush {
this.resource.set(null);
}
}
private static class ThreadInfo {
private final Thread thread;
/** # of activate() was called. */
private int nActive;
private ThreadInfo(Thread thread) {
this.thread = thread;
}
public String toString() {
return "[" + thread + ',' + nActive + ']';
}
}
}