[ 1801842 ] DB connection fix & improvements for concurrent threads

- more robust connection and transaction management when using wan and vpn profile.
This commit is contained in:
Heng Sin Low 2007-10-05 10:41:42 +00:00
parent 370c205e17
commit 20eeced360
11 changed files with 324 additions and 106 deletions

View File

@ -257,7 +257,7 @@ public interface AdempiereDatabase
/**
* Get Status
* @return status info
* @return status info or null if no local datasource available
*/
public String getStatus();

View File

@ -950,7 +950,7 @@ public class CConnection implements Serializable, Cloneable
public Exception testDatabase(boolean retest)
{
// At this point Application Server Connection is tested.
if (isRMIoverHTTP())
if (DB.isRemoteObjects() || isRMIoverHTTP())
return null;
if (!retest && m_ds != null && m_okDB)
return null;
@ -1211,7 +1211,7 @@ public class CConnection implements Serializable, Cloneable
}
}
//hengsin, don't test datasource for wan profile
if (!isRMIoverHTTP())
if (!DB.isRemoteObjects() && !isRMIoverHTTP())
{
if (m_db != null) // test class loader ability
m_db.getDataSource(this);
@ -1602,7 +1602,7 @@ public class CConnection implements Serializable, Cloneable
setDbPort (svr.getDbPort ());
setDbName (svr.getDbName ());
setDbUid (svr.getDbUid ());
if (isRMIoverHTTP())
if (isRMIoverHTTP() || DB.isRemoteObjects())
setDbPwd ("");
else
setDbPwd (svr.getDbPwd ());

View File

@ -32,18 +32,26 @@ import java.sql.Statement;
import java.util.Map;
import org.compiere.util.DB;
import org.compiere.util.Trx;
/**
* Connection that is used to execute query on Server for Client processes
* Need for Jasper Report processes as the Jasper Manager uses a connection
*
* @author Ashley G Ramdass
* @author Teo Sarca, SC ARHIPAC SERVICE SRL
* <li>BF [ 1806700 ] Compile error on JAVA 6
*/
public class ServerConnection implements Connection
{
private String trxName = null;
public ServerConnection() {
}
public ServerConnection(String trxName) {
this.trxName = trxName;
}
public void clearWarnings() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method clearWarnings() not yet implemented.");
@ -51,12 +59,21 @@ public class ServerConnection implements Connection
public void close() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method close() not yet implemented.");
if (trxName != null) {
Trx trx = Trx.get(trxName, false);
if (trx != null)
trx.close();
trxName = null;
}
}
public void commit() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method commit() not yet implemented.");
if (trxName != null) {
Trx trx = Trx.get(trxName, false);
if (trx != null)
trx.commit(true);
}
}
public Statement createStatement() throws SQLException
@ -66,7 +83,7 @@ public class ServerConnection implements Connection
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException
{
return DB.createStatement(resultSetType, resultSetConcurrency, null);
return DB.createStatement(resultSetType, resultSetConcurrency, trxName);
}
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException
@ -76,7 +93,7 @@ public class ServerConnection implements Connection
public boolean getAutoCommit() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method getAutoCommit() not yet implemented.");
return (trxName != null);
}
public String getCatalog() throws SQLException
@ -96,7 +113,7 @@ public class ServerConnection implements Connection
public int getTransactionIsolation() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method getTransactionIsolation() not yet implemented.");
return Connection.TRANSACTION_READ_COMMITTED;
}
public Map<String, Class<?>> getTypeMap() throws SQLException
@ -106,17 +123,17 @@ public class ServerConnection implements Connection
public SQLWarning getWarnings() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method getWarnings() not yet implemented.");
return null;
}
public boolean isClosed() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method isClosed() not yet implemented.");
return false;
}
public boolean isReadOnly() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method isReadOnly() not yet implemented.");
return false;
}
public String nativeSQL(String sql) throws SQLException
@ -131,12 +148,12 @@ public class ServerConnection implements Connection
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException
{
return DB.prepareCall(sql, resultSetConcurrency, null);
return DB.prepareCall(sql, resultSetConcurrency, trxName);
}
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException
{
return DB.prepareCall(sql, resultSetConcurrency, null);
return DB.prepareCall(sql, resultSetConcurrency, trxName);
}
public PreparedStatement prepareStatement(String sql) throws SQLException
@ -176,7 +193,11 @@ public class ServerConnection implements Connection
public void rollback() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method rollback() not yet implemented.");
if (trxName != null) {
Trx trx = Trx.get(trxName, false);
if (trx != null)
trx.rollback(true);
}
}
public void rollback(Savepoint savepoint) throws SQLException
@ -186,7 +207,17 @@ public class ServerConnection implements Connection
public void setAutoCommit(boolean autoCommit) throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method setAutoCommit() not yet implemented.");
if (autoCommit) {
if (trxName != null) {
Trx trx = Trx.get(trxName, false);
if (trx != null)
trx.close();
}
} else {
if (trxName == null) {
trxName = Trx.createTrxName();
}
}
}
public void setCatalog(String catalog) throws SQLException
@ -201,22 +232,24 @@ public class ServerConnection implements Connection
public void setReadOnly(boolean readOnly) throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method setReadOnly() not yet implemented.");
}
public Savepoint setSavepoint() throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method setSavepoint() not yet implemented.");
return setSavepoint(null);
}
public Savepoint setSavepoint(String name) throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method setSavepoint() not yet implemented.");
if (trxName == null) {
trxName = Trx.createTrxName();
}
Trx trx = Trx.get(trxName, false);
return trx.setSavepoint(name);
}
public void setTransactionIsolation(int level) throws SQLException
{
throw new java.lang.UnsupportedOperationException ("Method setTransactionIsolation() not yet implemented.");
{
}
public void setTypeMap(Map<String, Class<?>> arg0) throws SQLException

View File

@ -163,6 +163,26 @@ public interface Server
public java.lang.String getStatus( )
throws java.rmi.RemoteException;
/**
* Set savepoint
* @param trxName
* @param savePointName
* @return true if success, false otherwise */
public org.compiere.util.SavepointVO setSavepoint( java.lang.String trxName,java.lang.String savePointName )
throws java.rmi.RemoteException;
/**
* Start remote transaction
* @param trxName */
public void startTransaction( java.lang.String trxName )
throws java.rmi.RemoteException;
/**
* Close remote transaction
* @param trxName */
public void closeTransaction( java.lang.String trxName )
throws java.rmi.RemoteException;
/**
* Commit the named transaction on server
* @param trxName
@ -177,6 +197,13 @@ public interface Server
public boolean rollback( java.lang.String trxName )
throws java.rmi.RemoteException;
/**
* Rollback the named transaction on server
* @param trxName
* @return true if success, false otherwise */
public boolean rollback( java.lang.String trxName,org.compiere.util.SavepointVO savePoint )
throws java.rmi.RemoteException;
/**
* Execute db proces on server
* @param processInfo

View File

@ -146,6 +146,23 @@ public interface ServerLocal
* @return Debugging information about the instance and its content */
public java.lang.String getStatus( ) ;
/**
* Set savepoint
* @param trxName
* @param savePointName
* @return true if success, false otherwise */
public org.compiere.util.CSavePoint setSavepoint( java.lang.String trxName,java.lang.String savePointName ) ;
/**
* Start remote transaction
* @param trxName */
public void startTransaction( java.lang.String trxName ) ;
/**
* Close remote transaction
* @param trxName */
public void closeTransaction( java.lang.String trxName ) ;
/**
* Commit the named transaction on server
* @param trxName
@ -158,6 +175,12 @@ public interface ServerLocal
* @return true if success, false otherwise */
public boolean rollback( java.lang.String trxName ) ;
/**
* Rollback the named transaction on server
* @param trxName
* @return true if success, false otherwise */
public boolean rollback( java.lang.String trxName,org.compiere.util.CSavePoint savePoint ) ;
/**
* Execute db proces on server
* @param processInfo

View File

@ -252,19 +252,12 @@ public class GridTabVO implements Evaluatee, Serializable
if (DB.isRemoteObjects() && CConnection.get().isAppsServerOK(false))
{
remoteCreateFields(mTabVO);
if (CConnection.get().isRMIoverHTTP())
{
return mTabVO.initFields;
}
else
{
if (mTabVO.initFields) return true;
}
return mTabVO.initFields;
}
if (CConnection.get().isRMIoverHTTP())
if (CConnection.get().isRMIoverHTTP() || CConnection.get().getDatabase().getStatus() == null)
{
CLogger.get().log(Level.SEVERE, "WAN - Application server not available.");
CLogger.get().log(Level.SEVERE, "Application server not available.");
return false;
}

View File

@ -153,7 +153,7 @@ public class CPreparedStatement extends CStatement implements PreparedStatement
throw new RuntimeException(ex);
}
// Try locally
if (!CConnection.get().isRMIoverHTTP())
if (!CConnection.get().isRMIoverHTTP() && CConnection.get().getDatabase().getStatus() != null)
{
log.warning("Execute locally");
p_stmt = local_getPreparedStatement (false, null); // shared connection
@ -223,7 +223,7 @@ public class CPreparedStatement extends CStatement implements PreparedStatement
throw new RuntimeException(ex);
}
// Try locally
if (!CConnection.get().isRMIoverHTTP())
if (!CConnection.get().isRMIoverHTTP() && CConnection.get().getDatabase().getStatus() != null)
{
log.warning("execute locally");
p_stmt = local_getPreparedStatement (false, null); // shared connection
@ -941,7 +941,7 @@ public class CPreparedStatement extends CStatement implements PreparedStatement
throw new RuntimeException(ex);
}
// Try locally
if (!CConnection.get().isRMIoverHTTP())
if (!CConnection.get().isRMIoverHTTP() && CConnection.get().getDatabase().getStatus() != null)
{
log.warning("Execute locally");
p_stmt = local_getPreparedStatement (false, null); // shared connection

View File

@ -165,7 +165,7 @@ public class CStatement implements Statement
throw new RuntimeException(ex);
}
// Try locally
if (!CConnection.get().isRMIoverHTTP())
if (!CConnection.get().isRMIoverHTTP() && CConnection.get().getDatabase().getStatus() != null)
{
log.warning("execute locally");
p_stmt = local_getStatement (false, null); // shared connection
@ -224,7 +224,7 @@ public class CStatement implements Statement
throw new RuntimeException(ex);
}
// Try locally
if (!CConnection.get().isRMIoverHTTP())
if (!CConnection.get().isRMIoverHTTP() && CConnection.get().getDatabase().getStatus() != null)
{
log.warning("execute locally");
p_stmt = local_getStatement (false, null); // shared connection
@ -928,7 +928,7 @@ public class CStatement implements Statement
throw new RuntimeException(ex);
}
// Try locally
if (!CConnection.get().isRMIoverHTTP())
if (!CConnection.get().isRMIoverHTTP() && CConnection.get().getDatabase().getStatus() != null)
{
log.warning("Execute locally");
p_stmt = local_getStatement(false, null); // shared connection

View File

@ -385,7 +385,8 @@ public final class DB
public static Connection createConnection (boolean autoCommit, int trxLevel)
{
//wan profile
if (CConnection.get().isRMIoverHTTP()) return null;
if (CConnection.get().isRMIoverHTTP() || CConnection.get().getDatabase().getStatus() == null)
return new ServerConnection();
Connection conn = s_cc.getConnection (autoCommit, trxLevel);
if (CLogMgt.isLevelFinest())
@ -417,7 +418,8 @@ public final class DB
public static Connection createConnection (boolean autoCommit, boolean readOnly, int trxLevel)
{
//wan profile
if (CConnection.get().isRMIoverHTTP()) return null;
if (CConnection.get().isRMIoverHTTP() || CConnection.get().getDatabase().getStatus() == null)
return new ServerConnection();
Connection conn = s_cc.getConnection (autoCommit, trxLevel);

View File

@ -0,0 +1,32 @@
package org.compiere.util;
import java.io.Serializable;
import java.sql.SQLException;
import java.sql.Savepoint;
public class SavepointVO implements Savepoint, Serializable {
int savePointId = -1;
String savePointName = null;
public SavepointVO(Savepoint sp) {
try {
savePointId = sp.getSavepointId();
} catch (SQLException e) {
}
try {
savePointName = sp.getSavepointName();
} catch (SQLException e) {
}
}
public int getSavepointId() throws SQLException {
return savePointId;
}
public String getSavepointName() throws SQLException {
return savePointName;
}
}

View File

@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.logging.*;
import org.compiere.db.CConnection;
import org.compiere.db.ServerConnection;
import org.compiere.interfaces.Server;
/**
@ -82,6 +83,8 @@ public class Trx implements VetoableChangeListener
if (prefix == null || prefix.length() == 0)
prefix = "Trx";
prefix += "_" + UUID.randomUUID(); //System.currentTimeMillis();
//create transaction entry
Trx.get(prefix, true);
return prefix;
} // createTrxName
@ -107,13 +110,16 @@ public class Trx implements VetoableChangeListener
/**
* Transaction Constructor
* @param trxName unique name
@param con optional connection
* @param con optional connection ( ignore for remote transaction )
* */
private Trx (String trxName, Connection con)
{
// log.info (trxName);
setTrxName (trxName);
setConnection (con);
//create remote transaction immediately
if (DB.isRemoteObjects())
this.start();
} // Trx
/** Logger */
@ -121,7 +127,6 @@ public class Trx implements VetoableChangeListener
private Connection m_connection = null;
private String m_trxName = null;
//private Savepoint m_savepoint = null;
private boolean m_active = false;
/**
@ -132,14 +137,13 @@ public class Trx implements VetoableChangeListener
{
log.log(Level.ALL, "Active=" + isActive() + ", Connection=" + m_connection);
//wan profile
if (DB.isRemoteObjects()) return null;
if (DB.isRemoteObjects())
return new ServerConnection(getTrxName());
if (m_connection == null) // get new Connection
setConnection(DB.createConnection(false, Connection.TRANSACTION_READ_COMMITTED));
if (!isActive())
start();
// System.err.println ("Trx.getConnection - " + m_name + ": "+ m_connection);
// Trace.printStack();
return m_connection;
} // getConnection
@ -189,40 +193,35 @@ public class Trx implements VetoableChangeListener
*/
public boolean start()
{
if (/*m_savepoint != null || */m_active)
if (m_active)
{
log.warning("Trx in progress " + m_trxName /*+ " - " + m_savepoint*/);
log.warning("Trx in progress " + m_trxName);
return false;
}
if (DB.isRemoteObjects()) {
startRemoteTransaction();
}
m_active = true;
/*
try
{
if (m_connection != null)
{
//m_savepoint = m_connection.setSavepoint(m_trxName);
log.info("**** " + getTrxName());
}
}
catch (SQLException e)
{
log.log(Level.SEVERE, m_trxName, e);
//m_savepoint = null;
return false;
}*/
return true;
} // startTrx
/**
* Get Savepoint
* @return savepoint or null
*/
/*
public Savepoint getSavepoint()
{
return m_savepoint;
} // getSavepoint*/
private void startRemoteTransaction() {
Server server = CConnection.get().getServer();
try
{
if (server != null)
{ // See ServerBean
server.startTransaction(getTrxName());
}
log.log(Level.WARNING, "AppsServer not found");
}
catch (RemoteException ex)
{
log.log(Level.SEVERE, "AppsServer error", ex);
}
}
/**
* Transaction is Active
* @return true if transaction active
@ -250,14 +249,8 @@ public class Trx implements VetoableChangeListener
{
if (m_connection != null)
{
/*
if (m_savepoint == null)
m_connection.rollback();
else
m_connection.rollback(m_savepoint);*/
m_connection.rollback();
log.info ("**** " + m_trxName);
//m_savepoint = null;
m_active = false;
return true;
}
@ -267,12 +260,10 @@ public class Trx implements VetoableChangeListener
log.log(Level.SEVERE, m_trxName, e);
if (throwException)
{
//m_savepoint = null;
m_active = false;
throw e;
}
}
//m_savepoint = null;
m_active = false;
return false;
} // rollback
@ -290,6 +281,37 @@ public class Trx implements VetoableChangeListener
}
}
/**
* Rollback
* @param throwException if true, re-throws exception
* @return true if success, false if failed or transaction already rollback
*/
public boolean rollback(Savepoint savepoint) throws SQLException
{
//remote
if (DB.isRemoteObjects())
{
return remote_rollback(savepoint);
}
//local
try
{
if (m_connection != null)
{
m_connection.rollback(savepoint);
log.info ("**** " + m_trxName);
return true;
}
}
catch (SQLException e)
{
log.log(Level.SEVERE, m_trxName, e);
throw e;
}
return false;
} // rollback
/**
* Rollback a remote transaction
* @param throwException
@ -334,31 +356,47 @@ public class Trx implements VetoableChangeListener
}
/**
* Release savepoint
* @return true if released
*
public boolean release()
* Rollback a remote transaction
* @param throwException
* @return true if success, false otherwise
* @throws SQLException
*/
private boolean remote_rollback(Savepoint savepoint) throws SQLException
{
if (m_connection == null)
return false;
m_active = false;
if (m_savepoint == null)
return true;
Server server = CConnection.get().getServer();
try
{
getConnection().releaseSavepoint(m_savepoint);
log.fine("release **** " + getName());
m_savepoint = null;
if (server != null)
{
SavepointVO sp = null;
if (savepoint instanceof SavepointVO)
sp = (SavepointVO)savepoint;
else
sp = new SavepointVO(savepoint);
return server.rollback(m_trxName, sp);
}
log.log(Level.SEVERE, "AppsServer not found");
throw new SQLException("AppsServer not found");
}
catch (SQLException e)
catch (RemoteException ex)
{
log.log(Level.SEVERE, "release ****", e);
m_savepoint = null;
return false;
log.log(Level.SEVERE, "AppsServer error", ex);
if (ex.getCause() instanceof RuntimeException)
{
RuntimeException r = (RuntimeException)ex.getCause();
if (r.getCause() instanceof SQLException)
throw (SQLException)r.getCause();
else if ( r.getCause() != null )
throw new SQLException("Application server exception - " + r.getCause().getMessage());
else
throw new SQLException("Application server exception - " + r.getMessage());
}
else
{
throw new SQLException("Application server exception - " + ex.getMessage());
}
}
return true;
} // release
}
/**
* Commit
* @param throwException if true, re-throws exception
@ -379,7 +417,6 @@ public class Trx implements VetoableChangeListener
{
m_connection.commit();
log.info ("**** " + m_trxName);
//m_savepoint = null;
m_active = false;
return true;
}
@ -389,12 +426,10 @@ public class Trx implements VetoableChangeListener
log.log(Level.SEVERE, m_trxName, e);
if (throwException)
{
//m_savepoint = null;
m_active = false;
throw e;
}
}
//m_savepoint = null;
m_active = false;
return false;
} // commit
@ -467,11 +502,19 @@ public class Trx implements VetoableChangeListener
{
if (s_cache != null)
s_cache.remove(getTrxName());
//
//remote
if (DB.isRemoteObjects()) {
closeRemoteTransaction();
m_active = false;
return true;
}
//local
if (m_connection == null)
return true;
if (/*m_savepoint != null || */isActive())
if (isActive())
commit();
// Close Connection
@ -483,13 +526,78 @@ public class Trx implements VetoableChangeListener
{
log.log(Level.SEVERE, m_trxName, e);
}
//m_savepoint = null;
m_connection = null;
m_active = false;
log.config(m_trxName);
return true;
} // close
private void closeRemoteTransaction() {
Server server = CConnection.get().getServer();
try
{
if (server != null)
{ // See ServerBean
server.closeTransaction(getTrxName());
}
log.log(Level.WARNING, "AppsServer not found");
}
catch (RemoteException ex)
{
log.log(Level.SEVERE, "AppsServer error", ex);
}
}
/**
*
* @param name
* @return
* @throws SQLException
*/
public Savepoint setSavepoint(String name) throws SQLException {
//remote
if (DB.isRemoteObjects())
{
return setRemoteSavepoint(name);
}
if(m_connection != null) {
return m_connection.setSavepoint(name);
} else {
return null;
}
}
private Savepoint setRemoteSavepoint(String name) throws SQLException {
Server server = CConnection.get().getServer();
try
{
if (server != null)
{ // See ServerBean
return server.setSavepoint(m_trxName, name);
}
log.log(Level.SEVERE, "AppsServer not found");
throw new SQLException("AppsServer not found");
}
catch (RemoteException ex)
{
log.log(Level.SEVERE, "AppsServer error", ex);
if (ex.getCause() instanceof RuntimeException)
{
RuntimeException r = (RuntimeException)ex.getCause();
if (r.getCause() instanceof SQLException)
throw (SQLException)r.getCause();
else if ( r.getCause() != null )
throw new SQLException("Application server exception - " + r.getCause().getMessage());
else
throw new SQLException("Application server exception - " + r.getMessage());
}
else
throw new SQLException("Application server exception - " + ex.getMessage());
}
}
/**
* String Representation
* @return info