IDEMPIERE-175 Performance: Use atmosphere ( long pooling, NIO ) server push

This commit is contained in:
Heng Sin Low 2012-04-29 04:38:26 +08:00
parent 409453dca8
commit 8bf2eb7614
13 changed files with 2064 additions and 84 deletions

View File

@ -17,6 +17,7 @@
package org.adempiere.util;
import java.io.Serializable;
import java.util.Properties;
/**
@ -25,7 +26,7 @@ import java.util.Properties;
* @date Feb 25, 2007
* @version $Revision: 0.10 $
*/
public final class ServerContext
public final class ServerContext implements Serializable
{
/**
* generated serial version Id

View File

@ -4,7 +4,6 @@
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
<classpathentry kind="src" path="WEB-INF/src"/>
<classpathentry exported="true" kind="lib" path="WEB-INF/classes/" sourcepath="WEB-INF/src"/>
<classpathentry kind="lib" path="WEB-INF/lib/slf4j-api-1.6.1.jar"/>
<classpathentry kind="lib" path="WEB-INF/lib/atmosphere-compat-jbossweb-0.9.jar"/>
<classpathentry kind="lib" path="WEB-INF/lib/atmosphere-compat-tomcat-0.9.jar"/>
<classpathentry kind="lib" path="WEB-INF/lib/atmosphere-compat-tomcat7-0.9.jar" sourcepath="WEB-INF/lib/src/atmosphere-compat-tomcat7-0.9-sources.jar"/>

View File

@ -10,13 +10,15 @@ Import-Package: javax.servlet,
org.apache.commons.codec.binary,
org.apache.ecs,
org.apache.ecs.xhtml,
org.osgi.framework;version="1.5.0"
org.osgi.framework;version="1.5.0",
org.slf4j;version="1.6.1",
org.slf4j.helpers;version="1.6.1",
org.slf4j.spi;version="1.6.1"
Bundle-ClassPath: WEB-INF/classes/,
WEB-INF/lib/atmosphere-runtime-0.9.jar,
WEB-INF/lib/atmosphere-compat-jbossweb-0.9.jar,
WEB-INF/lib/atmosphere-compat-tomcat-0.9.jar,
WEB-INF/lib/atmosphere-compat-tomcat7-0.9.jar,
WEB-INF/lib/slf4j-api-1.6.1.jar
WEB-INF/lib/atmosphere-compat-tomcat7-0.9.jar
Export-Package: metainfo.zk,
org.adempiere.webui,
org.adempiere.webui.acct,
@ -40,24 +42,7 @@ Export-Package: metainfo.zk,
org.adempiere.webui.session,
org.adempiere.webui.theme,
org.adempiere.webui.util,
org.adempiere.webui.window,
org.atmosphere.cache,
org.atmosphere.client,
org.atmosphere.config,
org.atmosphere.container,
org.atmosphere.container.version,
org.atmosphere.cpr,
org.atmosphere.di,
org.atmosphere.handler,
org.atmosphere.util,
org.atmosphere.util.uri,
org.atmosphere.websocket,
org.atmosphere.websocket.protocol,
org.jboss.servlet.http,
org.apache.catalina,
org.apache.catalina.comet,
org.slf4j.helpers,
org.slf4j.spi
org.adempiere.webui.window
Require-Bundle: org.adempiere.report.jasper;bundle-version="1.0.0",
org.adempiere.base;bundle-version="1.0.0",
org.adempiere.report.jasper.library;bundle-version="1.0.0",

View File

@ -19,6 +19,7 @@ the License.
*/
package fi.jawsy.jawwa.zk.atmosphere;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.atmosphere.cpr.AtmosphereResource;
@ -112,17 +113,35 @@ public class AtmosphereServerPush implements ServerPush {
return true;
}
public void clearResource(AtmosphereResource resource) {
public synchronized void clearResource(AtmosphereResource resource) {
this.resource.compareAndSet(resource, null);
}
private void commitResponse() {
private synchronized void commitResponse() throws IOException {
AtmosphereResource resource = this.resource.getAndSet(null);
if (resource != null) {
resource.resume();
resource.resume();
}
}
private synchronized void onPush() throws IOException {
AtmosphereResource resource = this.resource.get();
if (resource != null) {
switch (resource.transport()) {
case JSONP:
case LONG_POLLING:
if (resource.isSuspended())
commitResponse();
break;
case WEBSOCKET :
case STREAMING:
resource.getResponse().getWriter().write("@");
resource.getResponse().getWriter().flush();
break;
}
}
}
@Override
public boolean deactivate(boolean stop) {
boolean stopped = false;
@ -157,10 +176,17 @@ public class AtmosphereServerPush implements ServerPush {
}
@Override
public <T extends Event> void schedule(EventListener<T> task, T event,
public synchronized <T extends Event> void schedule(EventListener<T> task, T event,
Scheduler<T> scheduler) {
boolean pendingPush = ((DesktopCtrl)desktop.get()).scheduledServerPush();
scheduler.schedule(task, event);
commitResponse();
if (!pendingPush || (this.resource.get() != null && this.resource.get().isSuspended())) {
try {
onPush();
} catch (IOException e) {
log.warn(e.getLocalizedMessage(), e);
}
}
}
@Override
@ -187,32 +213,40 @@ public class AtmosphereServerPush implements ServerPush {
log.debug("Stopping server push for " + desktop);
Clients.response("jawwa.atmosphere.serverpush", new AuScript(null, "jawwa.atmosphere.stopServerPush('" + desktop.getId() + "');"));
commitResponse();
try {
commitResponse();
} catch (IOException e) {
}
}
public void updateResource(AtmosphereResource resource) {
commitResponse();
public synchronized void onRequest(AtmosphereResource resource) {
if (this.resource.get() != null) {
AtmosphereResource aResource = this.resource.get();
if (aResource.isSuspended()) {
try {
commitResponse();
} catch (IOException e) {
e.printStackTrace();
}
}
}
boolean shouldSuspend = true;
Desktop desktop = this.desktop.get();
if (desktop == null) {
return;
}
Desktop desktop = this.desktop.get();
if (desktop != null && desktop instanceof DesktopCtrl)
{
if (((DesktopCtrl)desktop).scheduledServerPush())
{
return;
}
}
if (desktop instanceof DesktopCtrl) {
DesktopCtrl desktopCtrl = (DesktopCtrl) desktop;
shouldSuspend = !desktopCtrl.scheduledServerPush();
}
if (shouldSuspend) {
resource.suspend(timeout, false);
this.resource.set(resource);
} else {
this.resource.set(null);
}
this.resource.set(resource);
if (!resource.isSuspended()) {
resource.suspend(-1, true);
}
}
private static class ThreadInfo {
private static class ThreadInfo {
private final Thread thread;
/** # of activate() was called. */
private int nActive;

View File

@ -118,11 +118,12 @@ public class ZkAtmosphereHandler implements AtmosphereHandler {
if (error != null && serverPushEither.getRightValue() == null) {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
response.getWriter().write(error);
response.getWriter().flush();
return;
}
AtmosphereServerPush serverPush = serverPushEither.getRightValue();
serverPush.updateResource(resource);
serverPush.onRequest(resource);
}
@Override

View File

@ -24,6 +24,7 @@ import java.util.Properties;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpSession;
import org.adempiere.util.ServerContext;
import org.adempiere.webui.apps.AEnv;
import org.adempiere.webui.component.DrillCommand;
import org.adempiere.webui.component.TokenCommand;
@ -127,6 +128,8 @@ public class AdempiereWebUI extends Window implements EventListener<Event>, IWeb
loginCompleted();
}
Executions.getCurrent().getDesktop().enableServerPush(true);
Executions.getCurrent().getDesktop().addListener(new DrillCommand());
Executions.getCurrent().getDesktop().addListener(new TokenCommand());
Executions.getCurrent().getDesktop().addListener(new ZoomCommand());
@ -283,6 +286,9 @@ public class AdempiereWebUI extends Window implements EventListener<Event>, IWeb
ctx.put(ZK_DESKTOP_SESSION_KEY, this.getPage().getDesktop());
}
//update session context
currSess.setAttribute(SessionContextListener.SESSION_CTX, ServerContext.getCurrentInstance());
if ("Y".equalsIgnoreCase(Env.getContext(ctx, BrowserToken.REMEMBER_ME)) && MSystem.isZKRememberUserAllowed())
{
MUser user = MUser.get(ctx);

View File

@ -41,7 +41,7 @@ import org.zkoss.zul.Vbox;
* Contributors:
* CarlosRuiz - globalqss - Add unprocessed button to iDempiere
*/
public class DPActivities extends DashboardPanel implements EventListener {
public class DPActivities extends DashboardPanel implements EventListener<Event> {
/**
*

View File

@ -59,7 +59,7 @@ public class SessionContextListener implements ExecutionInit,
* @param exec
* @param createNew
*/
private void setupExecutionContextFromSession(Execution exec) {
public static void setupExecutionContextFromSession(Execution exec) {
Session session = exec.getDesktop().getSession();
Properties ctx = (Properties)session.getAttribute(SESSION_CTX);
HttpSession httpSession = (HttpSession)session.getNativeSession();
@ -115,7 +115,7 @@ public class SessionContextListener implements ExecutionInit,
* @param errs
* @see ExecutionCleanup#cleanup(Execution, Execution, List)
*/
public void cleanup(Execution exec, Execution parent, List errs)
public void cleanup(Execution exec, Execution parent, List<Throwable> errs)
{
//in servlet thread
if (parent == null)
@ -260,12 +260,12 @@ public class SessionContextListener implements ExecutionInit,
* @param errs
* @see EventThreadCleanup#cleanup(Component, Event, List)
*/
public void cleanup(Component comp, Event evt, List errs) throws Exception
public void cleanup(Component comp, Event evt, List<Throwable> errs) throws Exception
{
//in event processing thread
}
private boolean isContextValid() {
public static boolean isContextValid() {
Execution exec = Executions.getCurrent();
Properties ctx = ServerContext.getCurrentInstance();
if (ctx == null)
@ -278,6 +278,24 @@ public class SessionContextListener implements ExecutionInit,
{
return false;
}
Properties sessionCtx = (Properties) session.getAttribute(SESSION_CTX);
if (sessionCtx != null)
{
if (Env.getAD_Client_ID(sessionCtx) != Env.getAD_Client_ID(ctx))
{
return false;
}
if (Env.getAD_User_ID(sessionCtx) != Env.getAD_User_ID(ctx))
{
return false;
}
if (Env.getAD_Role_ID(sessionCtx) != Env.getAD_Role_ID(ctx))
{
return false;
}
}
return true;
}
}

View File

@ -14,6 +14,7 @@
package org.adempiere.webui.util;
import org.adempiere.exceptions.AdempiereException;
import org.adempiere.webui.session.SessionContextListener;
import org.zkoss.zk.ui.Desktop;
import org.zkoss.zk.ui.DesktopUnavailableException;
import org.zkoss.zk.ui.Executions;
@ -48,6 +49,9 @@ public class ServerPushTemplate {
EventListener<Event> task = new EventListener<Event>() {
@Override
public void onEvent(Event event) throws Exception {
if (!SessionContextListener.isContextValid()) {
SessionContextListener.setupExecutionContextFromSession(desktop.getExecution());
}
callback.updateUI();
}
};

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@
timeout: 300000,
delay: 1000,
failures: 0,
count: 0,
$init: function(desktop, timeout) {
this.desktop = desktop;
@ -36,31 +37,41 @@
return;
var me = this;
var jqxhr = $.ajax({
url: zk.ajaxURI("/comet", {
au: true
}),
type: "GET",
cache: false,
async: true,
global: false,
data: {
dtid: this.desktop.id
},
dataType: "",
timeout: me.timeout,
transport : 'long-polling',
error: function(jqxhr, textStatus, errorThrown) {
me.failures += 1;
me._schedule();
},
success: function(data) {
zAu.cmd0.echo(me.desktop);
me.failures = 0;
me._schedule();
}
});
this._req = jqxhr;
var socket = $.atmosphere;
var request = {
url: zk.ajaxURI("/comet", {
au: true
}),
logLevel : 'debug',
transport : 'streaming',
fallbackTransport: 'long-polling',
method: "GET",
cache: false,
async: true,
timeout: me.timeout,
onError: function(response) {
me.failures += 1;
me.count--;
if (response.transport == 'long-polling' && me.count == 0) {
me._schedule();
} else if (me.failures >= 10) {
me.stop();
}
},
onMessage: function(response) {
zAu.cmd0.echo(me.desktop);
me.failures = 0;
me.count--;
if (response.transport == 'long-polling' && me.count == 0) {
me._schedule();
}
}
};
request.url = request.url+'?dtid='+me.desktop.id;
this.count++;
socket.subscribe(request);
},
start: function() {
this.desktop._serverpush = this;

View File

@ -1,4 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<package name="jawwa.atmosphere" language="xul/html">
<script src="jquery.atmosphere.js" />
<script src="serverpush.js" />
</package>

View File

@ -14,8 +14,7 @@ bin.includes = META-INF/,\
WEB-INF/lib/atmosphere-runtime-0.9.jar,\
WEB-INF/lib/atmosphere-compat-jbossweb-0.9.jar,\
WEB-INF/lib/atmosphere-compat-tomcat-0.9.jar,\
WEB-INF/lib/atmosphere-compat-tomcat7-0.9.jar,\
WEB-INF/lib/slf4j-api-1.6.1.jar
WEB-INF/lib/atmosphere-compat-tomcat7-0.9.jar
src.includes = WEB-INF/classes/,\
WEB-INF/tld/,\
WEB-INF/web.xml,\