diff --git a/org.adempiere.base/src/org/compiere/util/DB.java b/org.adempiere.base/src/org/compiere/util/DB.java index 4a814f549d..f173211aef 100644 --- a/org.adempiere.base/src/org/compiere/util/DB.java +++ b/org.adempiere.base/src/org/compiere/util/DB.java @@ -401,6 +401,18 @@ public final class DB return createConnection(true, true, Connection.TRANSACTION_READ_COMMITTED); // see below } // getConnectionRO + /** + * Return a replica connection if possible, otherwise read committed, read/only from pool. + * @return Connection (r/o) + */ + public static Connection getReportingConnectionRO () + { + Connection conn = DBReadReplica.getConnectionRO(); + if (conn == null) + conn = getConnectionRO(); + return conn; + } // getReportingConnectionRO + /** * Create new Connection. * The connection must be closed explicitly by the application diff --git a/org.adempiere.base/src/org/compiere/util/DBReadReplica.java b/org.adempiere.base/src/org/compiere/util/DBReadReplica.java index 70b0b73b83..9e3edb9af1 100644 --- a/org.adempiere.base/src/org/compiere/util/DBReadReplica.java +++ b/org.adempiere.base/src/org/compiere/util/DBReadReplica.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.logging.Level; +import org.adempiere.exceptions.AdempiereException; import org.adempiere.exceptions.DBException; import org.compiere.model.MSysConfig; import org.compiere.model.MSystem; @@ -61,132 +62,18 @@ public class DBReadReplica { * */ public static PreparedStatement prepareNormalReadReplicaStatement(String sql, int resultSetType, int resultSetConcurrency, String trxName) { - String replicaURLsConfig = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS); // list of JDBC URLs separated by | - if (Util.isEmpty(replicaURLsConfig, true)) - return null; - int DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS, 5000); - int DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_MAX_ITERATIONS, 3); - - setUserPass(); - - Timestamp lastTs = setMasterVerificationTimestamp(); - String masterDBAddress = MSystem.get(Env.getCtx()).getDBAddress(); - - List urlConnList = new ArrayList(); - - String[] replicaURLs = replicaURLsConfig.split("\\|"); - int index = 0; - int length = replicaURLs.length; - for (int i = 0; i < length; i++) { - String replicaURL = replicaURLs[i].trim(); - index = (i + shift) % length; - URLReplicaConnection urc = new URLReplicaConnection(replicaURL, index); - urlConnList.add(urc); - } - shift++; - if (shift >= length) - shift = 0; - - // sort URLs by index - for load balancing - Collections.sort(urlConnList); - + Connection conn = getConnectionRO(); PreparedStatement statementToReturn = null; - String usedReplicaURL = null; - for (int i = 0; i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS; i++) { - - for (URLReplicaConnection urlConn : urlConnList) { - - if (! urlConn.isUsable()) - continue; // next urlConn - - String replicaURL = urlConn.getReplicaURL(); - - Connection conn = urlConn.getConnection(); - if (conn == null && urlConn.isUsable()) { - conn = tryConnect(replicaURL); - } - if (conn == null) { - urlConn.setUsable(false); - continue; // next urlConn - } - urlConn.setConnection(conn); - - String replicaDBAddress = getReplicaDBAddress(conn); - if (!masterDBAddress.equals(replicaDBAddress)) { - log.warning("Replica DB Address doesn't match with master DB Address -> " + replicaURL); - urlConn.setUsable(false); - continue; // next urlConn - } - - Timestamp replicaTs = getReplicaVerificationTimestamp(conn); - if (replicaTs == null) { - log.warning("Could not get replica verification timestamp -> " + replicaURL); - urlConn.setUsable(false); - continue; // next urlConn - } - - if (replicaTs.before(lastTs)) { - // not usable yet - continue; // next urlConn - } - try { - sql = DB.getDatabase().convertStatement(sql); - statementToReturn = conn.prepareStatement(sql, resultSetType, resultSetConcurrency); - usedReplicaURL = urlConn.getReplicaURL(); - } catch (SQLException e) { - log.warning("Error preparing statement in replica -> " + replicaURL + " / SQL = " + sql); - statementToReturn = null; - urlConn.setUsable(false); - } - } // end for urlConnList - - if (statementToReturn != null) - break; // break iterations - - boolean noMoreUsables = true; - for (URLReplicaConnection urlConn : urlConnList) { - if (urlConn.isUsable()) { - noMoreUsables = false; - break; - } - } - - if (noMoreUsables) - break; - - if (i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS-1) { - try { - log.warning("Waiting " + DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS + " milliseconds for replication to sync"); - Thread.sleep(DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS); - } catch (InterruptedException e) { - ; - } - } - } // end for iterations - - if (statementToReturn == null) - log.warning("Abandoning replicas: none usable or max wait reached without sync"); - - // close any connection not used in the statement to return - for (URLReplicaConnection urlConn : urlConnList) { - Connection conn = urlConn.getConnection(); - if (conn == null) - continue; + if (conn != null) { try { - if (statementToReturn != null && conn == statementToReturn.getConnection()) - continue; // next connection + sql = DB.getDatabase().convertStatement(sql); + statementToReturn = conn.prepareStatement(sql, resultSetType, resultSetConcurrency); } catch (SQLException e) { - log.warning("Error getting connection from statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage()); - } - try { - conn.close(); - } catch (SQLException e) { - log.warning("Could not close connection statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage()); + log.warning("Error preparing statement in replica -> SQL = " + sql); + statementToReturn = null; + throw new AdempiereException(e); } } - - if (statementToReturn != null) - log.warning("Using replica for statement, URL = " + usedReplicaURL + " / SQL = " + sql); // WARNING during test phase, change to info before moving to production return statementToReturn; } @@ -229,6 +116,7 @@ public class DBReadReplica { Connection conn = null; try { conn = DB.getDatabase(replicaURL).getDriverConnection(replicaURL, m_user, m_pass); + conn.setReadOnly(true); } catch (SQLException e) { log.warning("Could not get a connection to " + replicaURL + ", cause = " + e.getLocalizedMessage()); conn = null; @@ -346,4 +234,127 @@ public class DBReadReplica { } + public static Connection getConnectionRO() { + String replicaURLsConfig = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS); // list of JDBC URLs separated by | + if (Util.isEmpty(replicaURLsConfig, true)) + return null; + int DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS, 5000); + int DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_MAX_ITERATIONS, 3); + + setUserPass(); + + Timestamp lastTs = setMasterVerificationTimestamp(); + String masterDBAddress = MSystem.get(Env.getCtx()).getDBAddress(); + + List urlConnList = new ArrayList(); + + String[] replicaURLs = replicaURLsConfig.split("\\|"); + int index = 0; + int length = replicaURLs.length; + for (int i = 0; i < length; i++) { + String replicaURL = replicaURLs[i].trim(); + index = (i + shift) % length; + URLReplicaConnection urc = new URLReplicaConnection(replicaURL, index); + urlConnList.add(urc); + } + shift++; + if (shift >= length) + shift = 0; + + // sort URLs by index - for load balancing + Collections.sort(urlConnList); + + String usedReplicaURL = null; + Connection retConn = null; + for (int i = 0; i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS; i++) { + + for (URLReplicaConnection urlConn : urlConnList) { + + if (! urlConn.isUsable()) + continue; // next urlConn + + String replicaURL = urlConn.getReplicaURL(); + + Connection conn = urlConn.getConnection(); + if (conn == null && urlConn.isUsable()) { + conn = tryConnect(replicaURL); + } + if (conn == null) { + urlConn.setUsable(false); + continue; // next urlConn + } + urlConn.setConnection(conn); + + String replicaDBAddress = getReplicaDBAddress(conn); + if (!masterDBAddress.equals(replicaDBAddress)) { + log.warning("Replica DB Address doesn't match with master DB Address -> " + replicaURL); + urlConn.setUsable(false); + continue; // next urlConn + } + + Timestamp replicaTs = getReplicaVerificationTimestamp(conn); + if (replicaTs == null) { + log.warning("Could not get replica verification timestamp -> " + replicaURL); + urlConn.setUsable(false); + continue; // next urlConn + } + + if (replicaTs.before(lastTs)) { + // not usable yet + continue; // next urlConn + } + if (conn != null) { + retConn = conn; + usedReplicaURL = urlConn.getReplicaURL(); + break; // break urlConnList + } + } // end for urlConnList + + if (retConn != null) { + break; // break iterations + } + + boolean noMoreUsables = true; + for (URLReplicaConnection urlConn : urlConnList) { + if (urlConn.isUsable()) { + noMoreUsables = false; + break; + } + } + + if (noMoreUsables) + break; + + if (i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS-1) { + try { + log.warning("Waiting " + DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS + " milliseconds for replication to sync"); + Thread.sleep(DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS); + } catch (InterruptedException e) { + ; + } + } + } // end for iterations + + if (retConn == null) + log.warning("Abandoning replicas: none usable or max wait reached without sync"); + + // close any connection not used in the statement to return + for (URLReplicaConnection urlConn : urlConnList) { + Connection conn = urlConn.getConnection(); + if (conn == null) + continue; + if (retConn != null && retConn == conn) + continue; // next connection + try { + conn.close(); + } catch (SQLException e) { + log.warning("Could not close connection statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage()); + } + } + + if (retConn != null) + log.warning("Using replica for connection, URL = " + usedReplicaURL); + return retConn; + } + } diff --git a/org.adempiere.report.jasper/src/org/adempiere/report/jasper/ReportStarter.java b/org.adempiere.report.jasper/src/org/adempiere/report/jasper/ReportStarter.java index 0b8c8decd5..a67255d646 100644 --- a/org.adempiere.report.jasper/src/org/adempiere/report/jasper/ReportStarter.java +++ b/org.adempiere.report.jasper/src/org/adempiere/report/jasper/ReportStarter.java @@ -342,7 +342,7 @@ public class ReportStarter implements ProcessCall, ClientProcess */ protected Connection getConnection() { - return DB.getConnectionRW(); + return DB.getReportingConnectionRO(); } /** @@ -647,7 +647,10 @@ public class ReportStarter implements ProcessCall, ClientProcess JRSwapFileVirtualizer virtualizer = null; int maxPages = MSysConfig.getIntValue(MSysConfig.JASPER_SWAP_MAX_PAGES, DEFAULT_SWAP_MAX_PAGES); try { - conn = getConnection(); + if (trx != null) + conn = trx.getConnection(); + else + conn = getConnection(); String swapPath = System.getProperty("java.io.tmpdir"); JRSwapFile swapFile = new JRSwapFile(swapPath, 1024, 1024);