Merge release-7.1 into master

This commit is contained in:
Carlos Ruiz 2020-03-10 13:51:20 +01:00
commit 3ce8f5c7bb
3 changed files with 149 additions and 123 deletions

View File

@ -401,6 +401,18 @@ public final class DB
return createConnection(true, true, Connection.TRANSACTION_READ_COMMITTED); // see below
} // getConnectionRO
/**
* Return a replica connection if possible, otherwise read committed, read/only from pool.
* @return Connection (r/o)
*/
public static Connection getReportingConnectionRO ()
{
Connection conn = DBReadReplica.getConnectionRO();
if (conn == null)
conn = getConnectionRO();
return conn;
} // getReportingConnectionRO
/**
* Create new Connection.
* The connection must be closed explicitly by the application

View File

@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import org.adempiere.exceptions.AdempiereException;
import org.adempiere.exceptions.DBException;
import org.compiere.model.MSysConfig;
import org.compiere.model.MSystem;
@ -61,132 +62,18 @@ public class DBReadReplica {
*
*/
public static PreparedStatement prepareNormalReadReplicaStatement(String sql, int resultSetType, int resultSetConcurrency, String trxName) {
String replicaURLsConfig = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS); // list of JDBC URLs separated by |
if (Util.isEmpty(replicaURLsConfig, true))
return null;
int DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS, 5000);
int DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_MAX_ITERATIONS, 3);
setUserPass();
Timestamp lastTs = setMasterVerificationTimestamp();
String masterDBAddress = MSystem.get(Env.getCtx()).getDBAddress();
List<URLReplicaConnection> urlConnList = new ArrayList<URLReplicaConnection>();
String[] replicaURLs = replicaURLsConfig.split("\\|");
int index = 0;
int length = replicaURLs.length;
for (int i = 0; i < length; i++) {
String replicaURL = replicaURLs[i].trim();
index = (i + shift) % length;
URLReplicaConnection urc = new URLReplicaConnection(replicaURL, index);
urlConnList.add(urc);
}
shift++;
if (shift >= length)
shift = 0;
// sort URLs by index - for load balancing
Collections.sort(urlConnList);
Connection conn = getConnectionRO();
PreparedStatement statementToReturn = null;
String usedReplicaURL = null;
for (int i = 0; i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS; i++) {
for (URLReplicaConnection urlConn : urlConnList) {
if (! urlConn.isUsable())
continue; // next urlConn
String replicaURL = urlConn.getReplicaURL();
Connection conn = urlConn.getConnection();
if (conn == null && urlConn.isUsable()) {
conn = tryConnect(replicaURL);
}
if (conn == null) {
urlConn.setUsable(false);
continue; // next urlConn
}
urlConn.setConnection(conn);
String replicaDBAddress = getReplicaDBAddress(conn);
if (!masterDBAddress.equals(replicaDBAddress)) {
log.warning("Replica DB Address doesn't match with master DB Address -> " + replicaURL);
urlConn.setUsable(false);
continue; // next urlConn
}
Timestamp replicaTs = getReplicaVerificationTimestamp(conn);
if (replicaTs == null) {
log.warning("Could not get replica verification timestamp -> " + replicaURL);
urlConn.setUsable(false);
continue; // next urlConn
}
if (replicaTs.before(lastTs)) {
// not usable yet
continue; // next urlConn
}
if (conn != null) {
try {
sql = DB.getDatabase().convertStatement(sql);
statementToReturn = conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
usedReplicaURL = urlConn.getReplicaURL();
} catch (SQLException e) {
log.warning("Error preparing statement in replica -> " + replicaURL + " / SQL = " + sql);
log.warning("Error preparing statement in replica -> SQL = " + sql);
statementToReturn = null;
urlConn.setUsable(false);
}
} // end for urlConnList
if (statementToReturn != null)
break; // break iterations
boolean noMoreUsables = true;
for (URLReplicaConnection urlConn : urlConnList) {
if (urlConn.isUsable()) {
noMoreUsables = false;
break;
throw new AdempiereException(e);
}
}
if (noMoreUsables)
break;
if (i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS-1) {
try {
log.warning("Waiting " + DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS + " milliseconds for replication to sync");
Thread.sleep(DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS);
} catch (InterruptedException e) {
;
}
}
} // end for iterations
if (statementToReturn == null)
log.warning("Abandoning replicas: none usable or max wait reached without sync");
// close any connection not used in the statement to return
for (URLReplicaConnection urlConn : urlConnList) {
Connection conn = urlConn.getConnection();
if (conn == null)
continue;
try {
if (statementToReturn != null && conn == statementToReturn.getConnection())
continue; // next connection
} catch (SQLException e) {
log.warning("Error getting connection from statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage());
}
try {
conn.close();
} catch (SQLException e) {
log.warning("Could not close connection statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage());
}
}
if (statementToReturn != null)
log.warning("Using replica for statement, URL = " + usedReplicaURL + " / SQL = " + sql); // WARNING during test phase, change to info before moving to production
return statementToReturn;
}
@ -229,6 +116,7 @@ public class DBReadReplica {
Connection conn = null;
try {
conn = DB.getDatabase(replicaURL).getDriverConnection(replicaURL, m_user, m_pass);
conn.setReadOnly(true);
} catch (SQLException e) {
log.warning("Could not get a connection to " + replicaURL + ", cause = " + e.getLocalizedMessage());
conn = null;
@ -346,4 +234,127 @@ public class DBReadReplica {
}
public static Connection getConnectionRO() {
String replicaURLsConfig = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS); // list of JDBC URLs separated by |
if (Util.isEmpty(replicaURLsConfig, true))
return null;
int DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS, 5000);
int DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_MAX_ITERATIONS, 3);
setUserPass();
Timestamp lastTs = setMasterVerificationTimestamp();
String masterDBAddress = MSystem.get(Env.getCtx()).getDBAddress();
List<URLReplicaConnection> urlConnList = new ArrayList<URLReplicaConnection>();
String[] replicaURLs = replicaURLsConfig.split("\\|");
int index = 0;
int length = replicaURLs.length;
for (int i = 0; i < length; i++) {
String replicaURL = replicaURLs[i].trim();
index = (i + shift) % length;
URLReplicaConnection urc = new URLReplicaConnection(replicaURL, index);
urlConnList.add(urc);
}
shift++;
if (shift >= length)
shift = 0;
// sort URLs by index - for load balancing
Collections.sort(urlConnList);
String usedReplicaURL = null;
Connection retConn = null;
for (int i = 0; i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS; i++) {
for (URLReplicaConnection urlConn : urlConnList) {
if (! urlConn.isUsable())
continue; // next urlConn
String replicaURL = urlConn.getReplicaURL();
Connection conn = urlConn.getConnection();
if (conn == null && urlConn.isUsable()) {
conn = tryConnect(replicaURL);
}
if (conn == null) {
urlConn.setUsable(false);
continue; // next urlConn
}
urlConn.setConnection(conn);
String replicaDBAddress = getReplicaDBAddress(conn);
if (!masterDBAddress.equals(replicaDBAddress)) {
log.warning("Replica DB Address doesn't match with master DB Address -> " + replicaURL);
urlConn.setUsable(false);
continue; // next urlConn
}
Timestamp replicaTs = getReplicaVerificationTimestamp(conn);
if (replicaTs == null) {
log.warning("Could not get replica verification timestamp -> " + replicaURL);
urlConn.setUsable(false);
continue; // next urlConn
}
if (replicaTs.before(lastTs)) {
// not usable yet
continue; // next urlConn
}
if (conn != null) {
retConn = conn;
usedReplicaURL = urlConn.getReplicaURL();
break; // break urlConnList
}
} // end for urlConnList
if (retConn != null) {
break; // break iterations
}
boolean noMoreUsables = true;
for (URLReplicaConnection urlConn : urlConnList) {
if (urlConn.isUsable()) {
noMoreUsables = false;
break;
}
}
if (noMoreUsables)
break;
if (i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS-1) {
try {
log.warning("Waiting " + DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS + " milliseconds for replication to sync");
Thread.sleep(DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS);
} catch (InterruptedException e) {
;
}
}
} // end for iterations
if (retConn == null)
log.warning("Abandoning replicas: none usable or max wait reached without sync");
// close any connection not used in the statement to return
for (URLReplicaConnection urlConn : urlConnList) {
Connection conn = urlConn.getConnection();
if (conn == null)
continue;
if (retConn != null && retConn == conn)
continue; // next connection
try {
conn.close();
} catch (SQLException e) {
log.warning("Could not close connection statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage());
}
}
if (retConn != null)
log.warning("Using replica for connection, URL = " + usedReplicaURL);
return retConn;
}
}

View File

@ -342,7 +342,7 @@ public class ReportStarter implements ProcessCall, ClientProcess
*/
protected Connection getConnection()
{
return DB.getConnectionRW();
return DB.getReportingConnectionRO();
}
/**
@ -647,6 +647,9 @@ public class ReportStarter implements ProcessCall, ClientProcess
JRSwapFileVirtualizer virtualizer = null;
int maxPages = MSysConfig.getIntValue(MSysConfig.JASPER_SWAP_MAX_PAGES, DEFAULT_SWAP_MAX_PAGES);
try {
if (trx != null)
conn = trx.getConnection();
else
conn = getConnection();
String swapPath = System.getProperty("java.io.tmpdir");