IDEMPIERE-502 Use single thread pool for client and server process

This commit is contained in:
Heng Sin Low 2012-11-14 10:11:17 +08:00
parent ad50b501b2
commit 640888051a
9 changed files with 260 additions and 390 deletions

View File

@ -23,8 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@ -50,6 +49,7 @@ import org.compiere.util.Ini;
import org.compiere.util.Login;
import org.compiere.util.SecureEngine;
import org.compiere.util.SecureInterface;
import org.compiere.util.Trx;
import org.compiere.util.Util;
import org.eclipse.core.runtime.IProduct;
import org.eclipse.core.runtime.Platform;
@ -111,7 +111,7 @@ public final class Adempiere
private static CLogger log = null;
/** Thread pool **/
private static ThreadPoolExecutor threadPoolExecutor = null;
private static ScheduledThreadPoolExecutor threadPoolExecutor = null;
/** A list of event listeners for this component. */
private static EventListenerList m_listenerList = new EventListenerList();
@ -553,8 +553,10 @@ public final class Adempiere
} // startup
private static void createThreadPool() {
int min = 10;
int max = 200;
int max = Runtime.getRuntime().availableProcessors() * 3;
int min = max / 2;
int defaultMax = max;
int defaultMin = min;
Properties properties = Ini.getProperties();
String maxSize = properties.getProperty("MaxThreadPoolSize");
String minSize = properties.getProperty("MinThreadPoolSize");
@ -572,14 +574,19 @@ public final class Adempiere
max = min;
}
if (max <= 0) {
max = 200;
max = defaultMax;
}
if (min < 0) {
min = 10;
min = defaultMin;
}
// start thread pool
threadPoolExecutor = new ThreadPoolExecutor(min, max, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
threadPoolExecutor = new ScheduledThreadPoolExecutor(min);
threadPoolExecutor.setMaximumPoolSize(max);
threadPoolExecutor.setKeepAliveTime(10, TimeUnit.MINUTES);
threadPoolExecutor.allowCoreThreadTimeOut(true);
Trx.startTrxMonitor();
}
/**
@ -660,7 +667,7 @@ public final class Adempiere
}
}
public static ThreadPoolExecutor getThreadPoolExecutor() {
public static ScheduledThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}

View File

@ -25,9 +25,11 @@ import java.sql.Savepoint;
import java.util.Collection;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.adempiere.exceptions.AdempiereException;
import org.compiere.Adempiere;
import org.compiere.model.PO;
/**
@ -85,13 +87,9 @@ public class Trx implements VetoableChangeListener
private static Trx.TrxMonitor s_monitor = new Trx.TrxMonitor();
static
public static void startTrxMonitor()
{
Thread monitorThread = new Thread(s_monitor);
monitorThread.setDaemon(true);
monitorThread.setPriority(Thread.MIN_PRIORITY);
monitorThread.setName("Trx-Monitor");
monitorThread.start();
Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(s_monitor, 5, 5, TimeUnit.MINUTES);
}
/**
@ -611,32 +609,23 @@ public class Trx implements VetoableChangeListener
public void run()
{
for(;;)
if (Trx.s_cache != null && !Trx.s_cache.isEmpty())
{
if (Trx.s_cache != null && !Trx.s_cache.isEmpty())
Trx[] trxs = Trx.s_cache.values().toArray(new Trx[0]);
for(int i = 0; i < trxs.length; i++)
{
Trx[] trxs = Trx.s_cache.values().toArray(new Trx[0]);
for(int i = 0; i < trxs.length; i++)
{
if (trxs[i].m_startTime <= 0)
continue;
if (trxs[i].m_startTime <= 0)
continue;
long since = System.currentTimeMillis() - trxs[i].m_startTime;
if (since > trxs[i].getTimeout() * 1000)
{
trxs[i].log.log(Level.WARNING, "Transaction timeout. Name="+trxs[i].getTrxName() + ", timeout(sec)="+(since / 1000));
trxs[i].close();
}
long since = System.currentTimeMillis() - trxs[i].m_startTime;
if (since > trxs[i].getTimeout() * 1000)
{
trxs[i].log.log(Level.WARNING, "Transaction timeout. Name="+trxs[i].getTrxName() + ", timeout(sec)="+(since / 1000));
trxs[i].close();
}
}
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
}
private boolean isLocalTrx(String trxName)

View File

@ -26,13 +26,11 @@ import java.util.List;
import java.util.logging.Level;
import org.compiere.acct.DocManager;
import org.compiere.model.AdempiereProcessor2;
import org.compiere.model.MAcctProcessor;
import org.compiere.model.MAcctProcessorLog;
import org.compiere.model.MAcctSchema;
import org.compiere.model.MClient;
import org.compiere.model.MCost;
import org.compiere.model.MSchedule;
import org.compiere.util.DB;
import org.compiere.util.Env;
import org.compiere.util.TimeUtil;
@ -130,7 +128,7 @@ public class AcctProcessor extends AdempiereServer
pstmt.setInt(1, m_model.getAD_Client_ID());
pstmt.setBigDecimal(2, value);
rs = pstmt.executeQuery();
while (!isInterrupted() && rs.next())
while (!Thread.currentThread().isInterrupted() && rs.next())
{
BigDecimal processedOn = rs.getBigDecimal(1);
if (!listProcessedOn.contains(processedOn))
@ -198,7 +196,7 @@ public class AcctProcessor extends AdempiereServer
}
catch (Exception e)
{
log.log(Level.SEVERE, getName() + ": " + TableName, e);
log.log(Level.SEVERE, TableName, e);
ok = false;
}
if (!ok)

View File

@ -43,7 +43,7 @@ import org.compiere.wf.MWorkflowProcessor;
* @author Jorg Janke
* @version $Id: AdempiereServer.java,v 1.3 2006/10/09 00:23:26 jjanke Exp $
*/
public abstract class AdempiereServer extends Thread
public abstract class AdempiereServer implements Runnable
{
/**
* Create New Server Thead
@ -78,7 +78,6 @@ public abstract class AdempiereServer extends Thread
*/
protected AdempiereServer (AdempiereProcessor model, int initialNap)
{
super (AdempiereServerGroup.get(), null, model.getName(), 0);
p_model = model;
m_ctx = new Properties(model.getCtx());
if (p_system == null)
@ -86,7 +85,16 @@ public abstract class AdempiereServer extends Thread
p_client = MClient.get(m_ctx);
Env.setContext(m_ctx, "#AD_Client_ID", p_client.getAD_Client_ID());
m_initialNap = initialNap;
// log.info(model.getName() + " - " + getThreadGroup());
Timestamp dateNextRun = getDateNextRun(true);
if (dateNextRun != null)
m_nextWork = dateNextRun.getTime();
long now = System.currentTimeMillis();
if (m_nextWork > now)
{
m_sleepMS = m_nextWork - now;
}
} // ServerBase
/** The Processor Model */
@ -97,7 +105,7 @@ public abstract class AdempiereServer extends Thread
/** Milliseconds to sleep - 10 Min default */
protected long m_sleepMS = 600000;
/** Sleeping */
private volatile boolean m_sleeping = false;
private volatile boolean m_sleeping = true;
/** Server start time */
protected long m_start = 0;
/** Number of Work executions */
@ -137,40 +145,17 @@ public abstract class AdempiereServer extends Thread
return m_sleepMS;
} // getSleepMS
/**
* Sleep for set time
* @return true if not interrupted
*/
public boolean sleep()
public long getInitialNap()
{
if (isInterrupted())
{
log.info (getName() + ": interrupted");
return false;
}
log.fine(getName() + ": sleeping " + TimeUtil.formatElapsed(m_sleepMS));
m_sleeping = true;
try
{
sleep (m_sleepMS);
}
catch (InterruptedException e)
{
log.info (getName() + ": interrupted");
m_sleeping = false;
return false;
}
m_sleeping = false;
return true;
} // sleep
return m_initialNap;
}
/**
* Run Now
*/
public void runNow()
{
log.info(getName());
m_sleeping = false;
p_startWork = System.currentTimeMillis();
doWork();
long now = System.currentTimeMillis();
@ -183,7 +168,9 @@ public abstract class AdempiereServer extends Thread
p_model.setDateLastRun(new Timestamp(now));
p_model.saveEx();
//
log.fine(getName() + ": " + getStatistics());
if (log.isLoggable(Level.FINE))
log.fine(getStatistics());
m_sleeping = true;
} // runNow
/**************************************************************************
@ -191,76 +178,43 @@ public abstract class AdempiereServer extends Thread
*/
public void run ()
{
try
m_sleeping = false;
if (m_start == 0)
m_start = System.currentTimeMillis();
// ---------------
p_startWork = System.currentTimeMillis();
doWork();
long now = System.currentTimeMillis();
// ---------------
p_runCount++;
m_runLastMS = now - p_startWork;
m_runTotalMS += m_runLastMS;
// Finished work - calculate datetime for next run
Timestamp lastRun = new Timestamp(now);
if (p_model instanceof AdempiereProcessor2)
{
log.fine(getName() + ": pre-nap - " + m_initialNap);
sleep (m_initialNap * 1000);
}
catch (InterruptedException e)
{
log.log(Level.SEVERE, getName() + ": pre-nap interrupted", e);
return;
AdempiereProcessor2 ap = (AdempiereProcessor2) p_model;
if (ap.isIgnoreProcessingTime())
{
lastRun = new Timestamp(p_startWork);
}
}
m_start = System.currentTimeMillis();
while (true)
{
if (m_nextWork == 0)
{
Timestamp dateNextRun = getDateNextRun(true);
if (dateNextRun != null)
m_nextWork = dateNextRun.getTime();
}
long now = System.currentTimeMillis();
if (m_nextWork > now)
{
m_sleepMS = m_nextWork - now;
if (!sleep ())
break;
}
if (isInterrupted())
{
log.info (getName() + ": interrupted");
break;
}
m_nextWork = MSchedule.getNextRunMS(lastRun.getTime(),
p_model.getScheduleType(), p_model.getFrequencyType(),
p_model.getFrequency(), p_model.getCronPattern());
// ---------------
p_startWork = System.currentTimeMillis();
doWork();
now = System.currentTimeMillis();
// ---------------
p_runCount++;
m_runLastMS = now - p_startWork;
m_runTotalMS += m_runLastMS;
// Finished work - calculate datetime for next run
Timestamp lastRun = new Timestamp(now);
if (p_model instanceof AdempiereProcessor2)
{
AdempiereProcessor2 ap = (AdempiereProcessor2) p_model;
if (ap.isIgnoreProcessingTime())
{
lastRun = new Timestamp(p_startWork);
}
}
m_nextWork = MSchedule.getNextRunMS(lastRun.getTime(),
p_model.getScheduleType(), p_model.getFrequencyType(),
p_model.getFrequency(), p_model.getCronPattern());
m_sleepMS = m_nextWork - now;
log.info(getName() + " Next run: " + new Timestamp(m_nextWork) + " sleep " + m_sleepMS);
//
p_model.setDateLastRun(lastRun);
p_model.setDateNextRun(new Timestamp(m_nextWork));
p_model.saveEx();
//
log.fine(getName() + ": " + getStatistics());
if (!sleep())
break;
}
m_start = 0;
m_sleepMS = m_nextWork - now;
if (log.isLoggable(Level.INFO))
log.info(" Next run: " + new Timestamp(m_nextWork) + " sleep " + m_sleepMS);
//
p_model.setDateLastRun(lastRun);
p_model.setDateNextRun(new Timestamp(m_nextWork));
p_model.saveEx();
m_sleeping = true;
} // run
/**
@ -347,11 +301,8 @@ public abstract class AdempiereServer extends Thread
*/
public String toString ()
{
StringBuffer sb = new StringBuffer (getName())
.append (",Prio=").append(getPriority())
.append (",").append (getThreadGroup())
.append (",Alive=").append(isAlive())
.append (",Sleeping=").append(m_sleeping)
StringBuffer sb = new StringBuffer ()
.append ("Sleeping=").append(m_sleeping)
.append (",Last=").append(getDateLastRun());
if (m_sleeping)
sb.append (",Next=").append(getDateNextRun(false));
@ -392,6 +343,16 @@ public abstract class AdempiereServer extends Thread
} // getLogs
protected boolean isInterrupted() {
return Thread.currentThread().isInterrupted();
}
public String getName() {
return p_model.getName();
}
public static boolean isOKtoRunOnIP(AdempiereProcessor model) {
if (model instanceof AdempiereProcessor2) {
int AD_Schedule_ID = ((AdempiereProcessor2)model).getAD_Schedule_ID();

View File

@ -20,6 +20,8 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.adempiere.base.Service;
@ -75,7 +77,7 @@ public class AdempiereServerMgr
} // AdempiereServerMgr
/** The Servers */
private ArrayList<AdempiereServer> m_servers = new ArrayList<AdempiereServer>();
private ArrayList<ServerWrapper> m_servers = new ArrayList<ServerWrapper>();
/** Context */
private Properties m_ctx = Env.getCtx();
/** Start */
@ -107,7 +109,7 @@ public class AdempiereServerMgr
{
log.info("");
int noServers = 0;
m_servers=new ArrayList<AdempiereServer>();
m_servers=new ArrayList<ServerWrapper>();
// Accounting
MAcctProcessor[] acctModels = MAcctProcessor.getActive(m_ctx);
for (int i = 0; i < acctModels.length; i++)
@ -115,9 +117,9 @@ public class AdempiereServerMgr
MAcctProcessor pModel = acctModels[i];
AdempiereServer server = AdempiereServer.create(pModel);
if (server != null) {
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(new ServerWrapper(server));
}
}
// Request
@ -127,9 +129,9 @@ public class AdempiereServerMgr
MRequestProcessor pModel = requestModels[i];
AdempiereServer server = AdempiereServer.create(pModel);
if (server != null) {
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(new ServerWrapper(server));
}
}
// Workflow
@ -139,9 +141,9 @@ public class AdempiereServerMgr
MWorkflowProcessor pModel = workflowModels[i];
AdempiereServer server = AdempiereServer.create(pModel);
if (server != null) {
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(new ServerWrapper(server));
}
}
// Alert
@ -151,9 +153,9 @@ public class AdempiereServerMgr
MAlertProcessor pModel = alertModels[i];
AdempiereServer server = AdempiereServer.create(pModel);
if (server != null) {
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(new ServerWrapper(server));
}
}
// Scheduler
@ -163,9 +165,9 @@ public class AdempiereServerMgr
MScheduler pModel = schedulerModels[i];
AdempiereServer server = AdempiereServer.create(pModel);
if (server != null) {
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-2);
m_servers.add(new ServerWrapper(server));
}
}
// LDAP
@ -175,9 +177,9 @@ public class AdempiereServerMgr
MLdapProcessor lp = ldapModels[i];
AdempiereServer server = AdempiereServer.create(lp);
if (server != null) {
server.start();
server.setPriority(Thread.NORM_PRIORITY-1);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-1);
m_servers.add(new ServerWrapper(server));
}
}
@ -192,9 +194,9 @@ public class AdempiereServerMgr
{
for (AdempiereServer server : servers)
{
server.start();
server.setPriority(Thread.NORM_PRIORITY-1);
m_servers.add(server);
// server.start();
// server.setPriority(Thread.NORM_PRIORITY-1);
m_servers.add(new ServerWrapper(server));
}
}
}
@ -220,47 +222,17 @@ public class AdempiereServerMgr
public boolean startAll()
{
log.info ("");
AdempiereServer[] servers = getInActive();
ServerWrapper[] servers = getInActive();
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
try
{
if (server.isAlive())
if (server.scheduleFuture != null || !server.scheduleFuture.isDone())
continue;
// Wait until dead
if (server.isInterrupted())
{
int maxWait = 10; // 10 iterations = 1 sec
while (server.isAlive())
{
if (maxWait-- == 0)
{
log.severe ("Wait timeout for interruped " + server);
break;
}
try
{
Thread.sleep(100); // 1/10 sec
}
catch (InterruptedException e)
{
log.log(Level.SEVERE, "While sleeping", e);
}
}
}
// Do start
if (!server.isAlive())
{
// replace
server = AdempiereServer.create (server.getModel());
if (server == null)
m_servers.remove(i);
else
m_servers.set(i, server);
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
}
// replace
server.start();
}
catch (Exception e)
{
@ -273,10 +245,10 @@ public class AdempiereServerMgr
int noStopped = 0;
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
try
{
if (server.isAlive())
if (server.scheduleFuture != null && !server.scheduleFuture.isDone())
{
log.info("Alive: " + server);
noRunning++;
@ -294,7 +266,6 @@ public class AdempiereServerMgr
}
}
log.fine("Running=" + noRunning + ", Stopped=" + noStopped);
AdempiereServerGroup.get().dump();
return noStopped == 0;
} // startAll
@ -305,24 +276,16 @@ public class AdempiereServerMgr
*/
public boolean start (String serverID)
{
AdempiereServer server = getServer(serverID);
ServerWrapper server = getServer(serverID);
if (server == null)
return false;
if (server.isAlive())
if (server.scheduleFuture != null && !server.scheduleFuture.isDone())
return true;
try
{
// replace
int index = m_servers.indexOf(server);
server = AdempiereServer.create (server.getModel());
if (server == null)
m_servers.remove(index);
else
m_servers.set(index, server);
server.start();
server.setPriority(Thread.NORM_PRIORITY-2);
Thread.yield();
}
catch (Exception e)
{
@ -330,10 +293,7 @@ public class AdempiereServerMgr
return false;
}
log.info(server.toString());
AdempiereServerGroup.get().dump();
if (server == null)
return false;
return server.isAlive();
return (server.scheduleFuture != null && !server.scheduleFuture.isDone());
} // startIt
/**
@ -343,17 +303,16 @@ public class AdempiereServerMgr
public boolean stopAll()
{
log.info ("");
AdempiereServer[] servers = getActive();
ServerWrapper[] servers = getActive();
// Interrupt
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
try
{
if (server.isAlive() && !server.isInterrupted())
if (server.scheduleFuture != null && !server.scheduleFuture.isDone())
{
server.setPriority(Thread.MAX_PRIORITY-1);
server.interrupt();
server.scheduleFuture.cancel(true);
}
}
catch (Exception e)
@ -366,11 +325,11 @@ public class AdempiereServerMgr
// Wait for death
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
try
{
int maxWait = 10; // 10 iterations = 1 sec
while (server.isAlive())
while (server.scheduleFuture != null && !server.scheduleFuture.isDone())
{
if (maxWait-- == 0)
{
@ -391,10 +350,10 @@ public class AdempiereServerMgr
int noStopped = 0;
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
try
{
if (server.isAlive())
if (server.scheduleFuture != null && !server.scheduleFuture.isDone())
{
log.warning ("Alive: " + server);
noRunning++;
@ -423,15 +382,15 @@ public class AdempiereServerMgr
*/
public boolean stop (String serverID)
{
AdempiereServer server = getServer(serverID);
ServerWrapper server = getServer(serverID);
if (server == null)
return false;
if (!server.isAlive())
if (server.scheduleFuture == null || server.scheduleFuture.isDone())
return true;
try
{
server.interrupt();
server.scheduleFuture.cancel(true);
Thread.sleep(10); // 1/100 sec
}
catch (Exception e)
@ -440,8 +399,7 @@ public class AdempiereServerMgr
return false;
}
log.info(server.toString());
AdempiereServerGroup.get().dump();
return !server.isAlive();
return (server.scheduleFuture == null || server.scheduleFuture.isDone());
} // stop
@ -459,16 +417,16 @@ public class AdempiereServerMgr
* Get Active Servers
* @return array of active servers
*/
protected AdempiereServer[] getActive()
protected ServerWrapper[] getActive()
{
ArrayList<AdempiereServer> list = new ArrayList<AdempiereServer>();
ArrayList<ServerWrapper> list = new ArrayList<ServerWrapper>();
for (int i = 0; i < m_servers.size(); i++)
{
AdempiereServer server = (AdempiereServer)m_servers.get(i);
if (server != null && server.isAlive() && !server.isInterrupted())
ServerWrapper server = (ServerWrapper)m_servers.get(i);
if (server != null && server.scheduleFuture != null && !server.scheduleFuture.isDone())
list.add (server);
}
AdempiereServer[] retValue = new AdempiereServer[list.size ()];
ServerWrapper[] retValue = new ServerWrapper[list.size ()];
list.toArray (retValue);
return retValue;
} // getActive
@ -477,16 +435,16 @@ public class AdempiereServerMgr
* Get InActive Servers
* @return array of inactive servers
*/
protected AdempiereServer[] getInActive()
protected ServerWrapper[] getInActive()
{
ArrayList<AdempiereServer> list = new ArrayList<AdempiereServer>();
ArrayList<ServerWrapper> list = new ArrayList<ServerWrapper>();
for (int i = 0; i < m_servers.size(); i++)
{
AdempiereServer server = (AdempiereServer)m_servers.get(i);
if (server != null && (!server.isAlive() || !server.isInterrupted()))
ServerWrapper server = m_servers.get(i);
if (server != null && (server.scheduleFuture == null || server.scheduleFuture.isDone()))
list.add (server);
}
AdempiereServer[] retValue = new AdempiereServer[list.size()];
ServerWrapper[] retValue = new ServerWrapper[list.size()];
list.toArray (retValue);
return retValue;
} // getInActive
@ -495,9 +453,9 @@ public class AdempiereServerMgr
* Get all Servers
* @return array of servers
*/
public AdempiereServer[] getAll()
public ServerWrapper[] getAll()
{
AdempiereServer[] retValue = new AdempiereServer[m_servers.size()];
ServerWrapper[] retValue = new ServerWrapper[m_servers.size()];
m_servers.toArray (retValue);
return retValue;
} // getAll
@ -507,14 +465,14 @@ public class AdempiereServerMgr
* @param serverID server id
* @return server or null
*/
public AdempiereServer getServer (String serverID)
public ServerWrapper getServer (String serverID)
{
if (serverID == null)
return null;
for (int i = 0; i < m_servers.size(); i++)
{
AdempiereServer server = (AdempiereServer)m_servers.get(i);
if (serverID.equals(server.getServerID()))
ServerWrapper server = m_servers.get(i);
if (serverID.equals(server.server.getServerID()))
return server;
}
return null;
@ -553,8 +511,8 @@ public class AdempiereServerMgr
int noStopped = 0;
for (int i = 0; i < m_servers.size(); i++)
{
AdempiereServer server = (AdempiereServer)m_servers.get(i);
if (server.isAlive())
ServerWrapper server = m_servers.get(i);
if (server.scheduleFuture != null && !server.scheduleFuture.isDone())
noRunning++;
else
noStopped++;
@ -574,4 +532,38 @@ public class AdempiereServerMgr
return m_start;
} // getStartTime
public static class ServerWrapper implements Runnable
{
protected AdempiereServer server;
protected volatile ScheduledFuture<?> scheduleFuture;
public ServerWrapper(AdempiereServer server) {
this.server = server;
start();
}
public void start() {
scheduleFuture = Adempiere.getThreadPoolExecutor().schedule(this, server.getInitialNap() * 1000 + server.getSleepMS(), TimeUnit.MILLISECONDS);
}
@Override
public void run() {
server.run();
scheduleFuture = Adempiere.getThreadPoolExecutor().schedule(this, server.getSleepMS(), TimeUnit.MILLISECONDS);
}
public AdempiereServer getServer() {
return server;
}
public boolean isAlive() {
return scheduleFuture != null && !scheduleFuture.isDone();
}
public boolean isInterrupted() {
return scheduleFuture != null && scheduleFuture.isCancelled();
}
}
} // AdempiereServerMgr

View File

@ -454,8 +454,8 @@ public class AlertProcessor extends AdempiereServer
Adempiere.startup(true);
MAlertProcessor model = new MAlertProcessor (Env.getCtx(), 100, null);
AlertProcessor ap = new AlertProcessor(model);
ap.start();
AdempiereServerMgr.ServerWrapper wrapper = new AdempiereServerMgr.ServerWrapper(ap);
wrapper.start();
}

View File

@ -20,7 +20,6 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
@ -65,9 +64,9 @@ import org.compiere.model.AdempiereProcessorLog;
import org.compiere.model.MClient;
import org.compiere.model.MStore;
import org.compiere.model.MSystem;
import org.compiere.server.AdempiereServer;
import org.compiere.server.AdempiereServerGroup;
import org.compiere.server.AdempiereServerMgr;
import org.compiere.server.AdempiereServerMgr.ServerWrapper;
import org.compiere.util.CLogFile;
import org.compiere.util.CLogMgt;
import org.compiere.util.CLogger;
@ -181,8 +180,8 @@ public class AdempiereMonitor extends HttpServlet
return false;
log.info ("ServerID=" + serverID);
AdempiereServer server = m_serverMgr.getServer(serverID);
if (server == null)
ServerWrapper server = m_serverMgr.getServer(serverID);
if (server == null || server.getServer() == null)
{
m_message = new p();
m_message.addElement(new strong("Server not found: "));
@ -199,7 +198,7 @@ public class AdempiereMonitor extends HttpServlet
para.addElement(link);
b.addElement(para);
//
b.addElement(new h2(server.getName()));
b.addElement(new h2(server.getServer().getName()));
//
table table = new table();
table.setBorder(1);
@ -216,7 +215,7 @@ public class AdempiereMonitor extends HttpServlet
// line.addElement(new th().addElement("Description"));
table.addElement(line);
AdempiereProcessorLog[] logs = server.getLogs();
AdempiereProcessorLog[] logs = server.getServer().getLogs();
for (int i = 0; i < logs.length; i++)
{
AdempiereProcessorLog pLog = logs[i];
@ -252,8 +251,8 @@ public class AdempiereMonitor extends HttpServlet
return false;
log.info ("ServerID=" + serverID);
AdempiereServer server = m_serverMgr.getServer(serverID);
if (server == null)
ServerWrapper server = m_serverMgr.getServer(serverID);
if (server == null || server.getServer() == null)
{
m_message = new p();
m_message.addElement(new strong("Server not found: "));
@ -261,7 +260,7 @@ public class AdempiereMonitor extends HttpServlet
return false;
}
//
server.runNow();
server.getServer().runNow();
//
return true;
} // processRunParameter
@ -306,8 +305,8 @@ public class AdempiereMonitor extends HttpServlet
this.createSummaryPage(request, response,true);
m_dirAccessList = getDirAcessList();
} else {
AdempiereServer server = m_serverMgr.getServer(serverID);
if (server == null) {
ServerWrapper server = m_serverMgr.getServer(serverID);
if (server == null || server.getServer() == null) {
m_message = new p();
m_message.addElement(new strong("Server not found: "));
m_message.addElement(serverID);
@ -317,7 +316,7 @@ public class AdempiereMonitor extends HttpServlet
ok = m_serverMgr.start(serverID);
else
ok = m_serverMgr.stop(serverID);
m_message.addElement(server.getName());
m_message.addElement(server.getServer().getName());
}
}
}
@ -622,13 +621,13 @@ public class AdempiereMonitor extends HttpServlet
// ***** Server Links *****
bb.addElement(new hr());
para = new p();
AdempiereServer[] servers = m_serverMgr.getAll();
ServerWrapper[] servers = m_serverMgr.getAll();
for (int i = 0; i < servers.length; i++)
{
if (i > 0)
para.addElement(new br());
AdempiereServer server = servers[i];
link = new a ("#" + server.getServerID(), server.getName());
ServerWrapper server = servers[i];
link = new a ("#" + server.getServer().getServerID(), server.getServer().getName());
para.addElement(link);
font status = null;
if (server.isAlive())
@ -646,10 +645,10 @@ public class AdempiereMonitor extends HttpServlet
bb.removeEndEndModifier();
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
bb.addElement(new hr());
bb.addElement(new a().setName(server.getServerID()));
bb.addElement(new h2(server.getName()));
bb.addElement(new a().setName(server.getServer().getServerID()));
bb.addElement(new h2(server.getServer().getName()));
//
table = new table();
table.setBorder(1);
@ -660,10 +659,8 @@ public class AdempiereMonitor extends HttpServlet
if (server.isAlive())
{
String msg = "Stop";
if (server.isInterrupted())
msg += " (Interrupted)";
link = new a ("adempiereMonitor?Action=Stop_" + server.getServerID(), msg);
if (server.isSleeping())
link = new a ("adempiereMonitor?Action=Stop_" + server.getServer().getServerID(), msg);
if (server.getServer().isSleeping())
{
line.addElement(new th().addElement("Sleeping"));
line.addElement(new td().addElement(link));
@ -676,47 +673,45 @@ public class AdempiereMonitor extends HttpServlet
table.addElement(line);
line = new tr();
line.addElement(new th().addElement("Start - Elapsed"));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getStartTime())
+ " - " + TimeUtil.formatElapsed(server.getStartTime())));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getStartTime())
+ " - " + TimeUtil.formatElapsed(server.getServer().getStartTime())));
}
else
{
String msg = "Start";
if (server.isInterrupted())
msg += " (Interrupted)";
line.addElement(new th().addElement("Not Started"));
link = new a ("adempiereMonitor?Action=Start_" + server.getServerID(), msg);
link = new a ("adempiereMonitor?Action=Start_" + server.getServer().getServerID(), msg);
line.addElement(new td().addElement(link));
}
table.addElement(line);
//
line = new tr();
line.addElement(new th().addElement("Description"));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getDescription())));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getDescription())));
table.addElement(line);
//
line = new tr();
line.addElement(new th().addElement("Last Run"));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getDateLastRun())));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getDateLastRun())));
table.addElement(line);
line = new tr();
line.addElement(new th().addElement("Info"));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getServerInfo())));
line.addElement(new td().addElement(WebEnv.getCellContent(server.getServer().getServerInfo())));
table.addElement(line);
//
line = new tr();
line.addElement(new th().addElement("Next Run"));
td td = new td();
td.addElement(WebEnv.getCellContent(server.getDateNextRun(false)));
td.addElement(WebEnv.getCellContent(server.getServer().getDateNextRun(false)));
td.addElement(" - ");
link = new a ("adempiereMonitor?RunNow=" + server.getServerID(), "(Run Now)");
link = new a ("adempiereMonitor?RunNow=" + server.getServer().getServerID(), "(Run Now)");
td.addElement(link);
line.addElement(td);
table.addElement(line);
//
line = new tr();
line.addElement(new th().addElement("Statistics"));
line.addElement(new td().addElement(server.getStatistics()));
line.addElement(new td().addElement(server.getServer().getStatistics()));
table.addElement(line);
//
@ -725,7 +720,7 @@ public class AdempiereMonitor extends HttpServlet
link = new a ("#top", "Top");
bb.addElement(link);
bb.addElement(" - ");
link = new a ("adempiereMonitor?Log=" + server.getServerID(), "Log");
link = new a ("adempiereMonitor?Log=" + server.getServer().getServerID(), "Log");
bb.addElement(link);
bb.addElement(" - ");
link = new a ("adempiereMonitor", "Refresh");
@ -787,29 +782,29 @@ public class AdempiereMonitor extends HttpServlet
writer.print(m_serverMgr.getServerCount());
writer.println("</server-count>");
AdempiereServer[] servers = m_serverMgr.getAll();
ServerWrapper[] servers = m_serverMgr.getAll();
for (int i = 0; i < servers.length; i++)
{
AdempiereServer server = servers[i];
ServerWrapper server = servers[i];
writer.println("\t\t<server>");
writer.print("\t\t\t<id>");
writer.print(server.getServerID());
writer.print(server.getServer().getServerID());
writer.println("</id>");
writer.print("\t\t\t<name>");
writer.print(server.getName());
writer.print(server.getServer().getName());
writer.println("</name>");
writer.print("\t\t\t<description>");
writer.print(server.getDescription());
writer.print(server.getServer().getDescription());
writer.println("</description>");
writer.print("\t\t\t<info>");
writer.print(server.getServerInfo());
writer.print(server.getServer().getServerInfo());
writer.println("</info>");
writer.print("\t\t\t<status>");
if (server.isAlive())
{
if (server.isInterrupted())
writer.print("Interrupted");
else if (server.isSleeping())
else if (server.getServer().isSleeping())
writer.print("Sleeping");
else
writer.print("Running");
@ -818,16 +813,16 @@ public class AdempiereMonitor extends HttpServlet
writer.print("Stopped");
writer.println("</status>");
writer.print("\t\t\t<start-time>");
writer.print(server.getStartTime());
writer.print(server.getServer().getStartTime());
writer.println("</start-time>");
writer.print("\t\t\t<last-run>");
writer.print(server.getDateLastRun());
writer.print(server.getServer().getDateLastRun());
writer.println("</last-run>");
writer.print("\t\t\t<next-run>");
writer.print(server.getDateNextRun(false));
writer.print(server.getServer().getDateNextRun(false));
writer.println("</next-run>");
writer.print("\t\t\t<statistics>");
writer.print(server.getStatistics());
writer.print(server.getServer().getStatistics());
writer.println("</statistics>");
writer.println("\t\t</server>");
}

View File

@ -21,15 +21,12 @@ import java.util.Properties;
import java.util.logging.Level;
import org.adempiere.util.ServerContext;
import org.adempiere.webui.AdempiereWebUI;
import org.adempiere.webui.desktop.IDesktop;
import org.adempiere.webui.session.SessionContextListener;
import org.adempiere.webui.util.ServerPushTemplate;
import org.compiere.model.MSysConfig;
import org.compiere.util.CLogger;
import org.zkoss.util.Locales;
import org.zkoss.zk.ui.Desktop;
import org.zkoss.zk.ui.DesktopUnavailableException;
import org.zkoss.zk.ui.event.Events;
/**
@ -44,7 +41,6 @@ public class DashboardRunnable implements Runnable, Serializable
private static final long serialVersionUID = 5995227773511788894L;
private Desktop desktop;
private boolean stop = false;
private List<DashboardPanel> dashboardPanels;
private IDesktop appDesktop;
private Locale locale;
@ -72,77 +68,11 @@ public class DashboardRunnable implements Runnable, Serializable
public void run()
{
// default Update every one minutes
int interval = MSysConfig.getIntValue(MSysConfig.ZK_DASHBOARD_REFRESH_INTERVAL, 60000);
int cumulativeFailure = 0;
while(!stop) {
try {
Thread.sleep(interval);
} catch (InterruptedException e1) {
if (stop) break;
}
if (desktop.isAlive()) {
Locales.setThreadLocal(locale);
try {
refreshDashboard();
cumulativeFailure = 0;
} catch (DesktopUnavailableException de) {
cumulativeFailure++;
} catch (Exception e) {
logger.log(Level.INFO, e.getLocalizedMessage(), (e.getCause() != null ? e.getCause() : e));
cumulativeFailure++;
}
if (cumulativeFailure > 3)
break;
} else {
logger.log(Level.INFO, "Desktop destroy, will kill session.");
killSession();
break;
}
}
}
private void killSession() {
if (desktop.getSession() != null && desktop.getSession().getNativeSession() != null)
{
//differentiate between real destroy and refresh
try
{
Thread.sleep(90000);
}
catch (InterruptedException e)
{
try
{
desktop.getSession().getAttributes().clear();
desktop.getSession().invalidate();
}
catch (Exception e1) {}
return;
}
try
{
Object sessionObj = desktop.getSession().getAttribute(AdempiereWebUI.ZK_DESKTOP_SESSION_KEY);
if (sessionObj != null && sessionObj instanceof Desktop)
{
Desktop sessionDesktop = (Desktop) sessionObj;
//don't destroy session if it have been attached to another desktop ( refresh will do that )
if (sessionDesktop == desktop)
{
desktop.getSession().getAttributes().clear();
desktop.getSession().invalidate();
}
}
else
{
desktop.getSession().getAttributes().clear();
desktop.getSession().invalidate();
}
}
catch (Exception e1) {}
Locales.setThreadLocal(locale);
try {
refreshDashboard();
} catch (Exception e) {
logger.log(Level.INFO, e.getLocalizedMessage(), (e.getCause() != null ? e.getCause() : e));
}
}
@ -184,10 +114,6 @@ public class DashboardRunnable implements Runnable, Serializable
}
}
public void stop() {
stop = true;
}
/**
* Add DashboardPanel to the auto refresh list
* @param dashboardPanel

View File

@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.adempiere.webui.apps.AEnv;
@ -38,6 +40,7 @@ import org.adempiere.webui.report.HTMLExtension;
import org.adempiere.webui.session.SessionManager;
import org.adempiere.webui.window.FDialog;
import org.adempiere.webui.window.ZkReportViewerProvider;
import org.compiere.Adempiere;
import org.compiere.model.I_AD_Menu;
import org.compiere.model.MDashboardContent;
import org.compiere.model.MDashboardPreference;
@ -48,6 +51,7 @@ import org.compiere.model.MPInstancePara;
import org.compiere.model.MProcess;
import org.compiere.model.MQuery;
import org.compiere.model.MRole;
import org.compiere.model.MSysConfig;
import org.compiere.model.MTable;
import org.compiere.print.ReportEngine;
import org.compiere.process.ProcessInfo;
@ -91,8 +95,8 @@ public class DashboardController implements EventListener<Event> {
private List<Anchorchildren> columnList = new ArrayList<Anchorchildren>();
private Anchorlayout dashboardLayout;
private Anchorchildren maximizedHolder;
private Thread dashboardThread;
private DashboardRunnable dashboardRunnable;
private ScheduledFuture<?> dashboardFuture;
public DashboardController() {
dashboardLayout = new Anchorlayout();
@ -406,9 +410,9 @@ public class DashboardController implements EventListener<Event> {
{
dashboardRunnable.refreshDashboard();
dashboardThread = new Thread(dashboardRunnable, "UpdateInfo");
dashboardThread.setDaemon(true);
dashboardThread.start();
// default Update every one minutes
int interval = MSysConfig.getIntValue(MSysConfig.ZK_DASHBOARD_REFRESH_INTERVAL, 60000);
dashboardFuture = Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(dashboardRunnable, interval, interval, TimeUnit.MILLISECONDS);
}
}
@ -620,15 +624,14 @@ public class DashboardController implements EventListener<Event> {
* @param appDesktop
*/
public void onSetPage(Page page, Desktop desktop, IDesktop appDesktop) {
if (dashboardThread != null && dashboardThread.isAlive()) {
dashboardRunnable.stop();
dashboardThread.interrupt();
if (dashboardFuture != null && !dashboardFuture.isDone()) {
dashboardFuture.cancel(true);
DashboardRunnable tmp = dashboardRunnable;
dashboardRunnable = new DashboardRunnable(tmp, desktop, appDesktop);
dashboardThread = new Thread(dashboardRunnable, "UpdateInfo");
dashboardThread.setDaemon(true);
dashboardThread.start();
// default Update every one minutes
int interval = MSysConfig.getIntValue(MSysConfig.ZK_DASHBOARD_REFRESH_INTERVAL, 60000);
dashboardFuture = Adempiere.getThreadPoolExecutor().scheduleWithFixedDelay(dashboardRunnable, interval, interval, TimeUnit.MILLISECONDS);
}
}
@ -636,9 +639,8 @@ public class DashboardController implements EventListener<Event> {
* clean up for logout
*/
public void onLogOut() {
if (dashboardThread != null && dashboardThread.isAlive()) {
dashboardRunnable.stop();
dashboardThread.interrupt();
if (dashboardFuture != null && !dashboardFuture.isDone()) {
dashboardFuture.cancel(true);
}
}