IDEMPIERE-3850 Performance improvement: implement reporting from read-only replica (from TrekGlobal)

This commit is contained in:
Carlos Ruiz 2018-12-18 14:22:17 +01:00
parent 610c11baa0
commit 1fd56b7609
7 changed files with 432 additions and 2 deletions

View File

@ -0,0 +1,9 @@
-- 1008281 - Implement reporting from read-only replica
CREATE TABLE dbreplicasyncverifier (lastupdate date);
INSERT INTO dbreplicasyncverifier values (to_date('1900-01-01 00:00:00', 'yyyy-mm-dd HH24:MI:SS'));
SELECT register_migration_script('201708031855_1008281.sql') FROM dual
;

View File

@ -0,0 +1,13 @@
-- 1008281 - Implement reporting from read-only replica
CREATE TABLE dbreplicasyncverifier (lastupdate timestamp);
INSERT INTO dbreplicasyncverifier values (to_timestamp('1900-01-01 00:00:00', 'yyyy-mm-dd HH24:MI:SS'));
CREATE OR REPLACE RULE insert_dual AS ON INSERT TO dual DO INSTEAD NOTHING;
CREATE OR REPLACE RULE delete_dual AS ON DELETE TO dual DO INSTEAD NOTHING;
SELECT register_migration_script('201708031855_1008281.sql') FROM dual
;

View File

@ -95,6 +95,8 @@ public class StatementProxy implements InvocationHandler {
return null;
} else if (name.equals("getSql") && (args == null || args.length == 0)) {
return getSql();
} else if (name.equals("equals") && (args != null && args.length == 1)) {
return equals(args[0]);
}
String logSql = null;

View File

@ -42,7 +42,7 @@ public class MSysConfig extends X_AD_SysConfig
/**
*
*/
private static final long serialVersionUID = -1273442365045945366L;
private static final long serialVersionUID = 2856526441538434702L;
public static final String ADDRESS_VALIDATION = "ADDRESS_VALIDATION";
public static final String ALERT_SEND_ATTACHMENT_AS_XLS = "ALERT_SEND_ATTACHMENT_AS_XLS";
@ -80,6 +80,9 @@ public class MSysConfig extends X_AD_SysConfig
public static final String CLIENT_ACCOUNTING = "CLIENT_ACCOUNTING";
public static final String DEFAULT_COA_PATH = "DEFAULT_COA_PATH";
public static final String DEFAULT_ENTITYTYPE = "DEFAULT_ENTITYTYPE"; // used as default in entity type columns with get_sysconfig
public static final String DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = "DB_READ_REPLICA_NORMAL_MAX_ITERATIONS";
public static final String DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = "DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS";
public static final String DB_READ_REPLICA_URLS = "DB_READ_REPLICA_URLS";
public static final String DICTIONARY_ID_COMMENTS = "DICTIONARY_ID_COMMENTS";
public static final String DICTIONARY_ID_PASSWORD = "DICTIONARY_ID_PASSWORD";
public static final String DICTIONARY_ID_USE_CENTRALIZED_ID = "DICTIONARY_ID_USE_CENTRALIZED_ID";

View File

@ -830,7 +830,7 @@ public class DataEngine
ResultSet rs = null;
try
{
pstmt = DB.prepareStatement(pd.getSQL(), m_trxName);
pstmt = DB.prepareNormalReadReplicaStatement(pd.getSQL(), m_trxName);
rs = pstmt.executeQuery();
// Row Loop
while (rs.next())

View File

@ -33,6 +33,7 @@ import java.sql.Timestamp;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
@ -2114,6 +2115,15 @@ public final class DB
} catch (SQLException e) {
;
}
if (readReplicaStatements.contains(st)) {
try {
DBReadReplica.closeReadReplicaStatement(st);
} catch (Exception e) {
;
} finally {
readReplicaStatements.remove(st);
}
}
}
/**
@ -2524,4 +2534,48 @@ public final class DB
return rowsArray;
}
/** Read Replica Statements List */
private static final List<PreparedStatement> readReplicaStatements = Collections.synchronizedList(new ArrayList<PreparedStatement>());
/**
* Prepare Read Replica Statement
* @param sql sql statement
* @param trxName transaction
* @return Prepared Statement (from replica if possible, otherwise normal statement)
*/
public static PreparedStatement prepareNormalReadReplicaStatement(String sql, String trxName) {
int concurrency = ResultSet.CONCUR_READ_ONLY;
String upper = sql.toUpperCase();
if (upper.startsWith("UPDATE ") || upper.startsWith("DELETE "))
concurrency = ResultSet.CONCUR_UPDATABLE;
return prepareNormalReadReplicaStatement(sql, ResultSet.TYPE_FORWARD_ONLY, concurrency, trxName);
}
/**
* Prepare Read Replica Statement
* @param sql sql statement
* @param resultSetType - ResultSet.TYPE_FORWARD_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.TYPE_SCROLL_SENSITIVE
* @param resultSetConcurrency - ResultSet.CONCUR_READ_ONLY or ResultSet.CONCUR_UPDATABLE
* @param trxName transaction name
* @return Prepared Statement (from replica if possible, otherwise normal statement)
*/
private static PreparedStatement prepareNormalReadReplicaStatement(String sql, int resultSetType, int resultSetConcurrency, String trxName) {
if (sql == null || sql.length() == 0)
throw new IllegalArgumentException("No SQL");
boolean useReadReplica = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS) != null;
if ( trxName == null
&& useReadReplica
&& resultSetType == ResultSet.TYPE_FORWARD_ONLY
&& resultSetConcurrency == ResultSet.CONCUR_READ_ONLY) {
// this is a candidate for a read replica connection (read-only, forward-only, no-trx), try to obtain one, otherwise fallback to normal
PreparedStatement stmt = DBReadReplica.prepareNormalReadReplicaStatement(sql, resultSetType, resultSetConcurrency, trxName);
if (stmt != null) {
readReplicaStatements.add(stmt);
return stmt;
}
}
//
return ProxyFactory.newCPreparedStatement(resultSetType, resultSetConcurrency, sql, trxName);
}
} // DB

View File

@ -0,0 +1,349 @@
/**********************************************************************
* Copyright (C) Contributors *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software *
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, *
* MA 02110-1301, USA. *
* *
* Contributors: *
* - Trek Global *
* - Carlos Ruiz - globalqss *
**********************************************************************/
package org.compiere.util;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import org.adempiere.exceptions.DBException;
import org.compiere.model.MSysConfig;
import org.compiere.model.MSystem;
public class DBReadReplica {
private static String m_user = null;
private static String m_pass = null;
final private static String sqlValidateSync = "SELECT lastupdate FROM dbreplicasyncverifier";
final private static String sqlUpdateSync = "UPDATE dbreplicasyncverifier SET lastupdate=SYSDATE";
final private static String sqlValidateDBAddress = "SELECT DBAddress FROM AD_System";
private volatile static int shift = 0; // for load balancing between different replicas
/** Logger */
private static CLogger log = CLogger.getCLogger (DBReadReplica.class);
/**
* Prepare Normal Read Replica Statement
* @param sql sql statement
* @param resultSetType - ResultSet.TYPE_FORWARD_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.TYPE_SCROLL_SENSITIVE
* @param resultSetConcurrency - ResultSet.CONCUR_READ_ONLY or ResultSet.CONCUR_UPDATABLE
* @param trxName transaction
* @return Prepared Statement (from replica if possible, otherwise null)
*
* @TODO: Add profiles fast and slow for other read replica statements
*
*/
public static PreparedStatement prepareNormalReadReplicaStatement(String sql, int resultSetType, int resultSetConcurrency, String trxName) {
String replicaURLsConfig = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS); // list of JDBC URLs separated by |
if (Util.isEmpty(replicaURLsConfig, true))
return null;
int DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS, 5000);
int DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_MAX_ITERATIONS, 3);
setUserPass();
Timestamp lastTs = setMasterVerificationTimestamp();
String masterDBAddress = MSystem.get(Env.getCtx()).getDBAddress();
List<URLReplicaConnection> urlConnList = new ArrayList<URLReplicaConnection>();
String[] replicaURLs = replicaURLsConfig.split("\\|");
int index = 0;
int length = replicaURLs.length;
for (int i = 0; i < length; i++) {
String replicaURL = replicaURLs[i].trim();
index = (i + shift) % length;
URLReplicaConnection urc = new URLReplicaConnection(replicaURL, index);
urlConnList.add(urc);
}
shift++;
if (shift >= length)
shift = 0;
// sort URLs by index - for load balancing
Collections.sort(urlConnList);
PreparedStatement statementToReturn = null;
String usedReplicaURL = null;
for (int i = 0; i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS; i++) {
for (URLReplicaConnection urlConn : urlConnList) {
if (! urlConn.isUsable())
continue; // next urlConn
String replicaURL = urlConn.getReplicaURL();
Connection conn = urlConn.getConnection();
if (conn == null && urlConn.isUsable()) {
conn = tryConnect(replicaURL);
}
if (conn == null) {
urlConn.setUsable(false);
continue; // next urlConn
}
urlConn.setConnection(conn);
String replicaDBAddress = getReplicaDBAddress(conn);
if (!masterDBAddress.equals(replicaDBAddress)) {
log.warning("Replica DB Address doesn't match with master DB Address -> " + replicaURL);
urlConn.setUsable(false);
continue; // next urlConn
}
Timestamp replicaTs = getReplicaVerificationTimestamp(conn);
if (replicaTs == null) {
log.warning("Could not get replica verification timestamp -> " + replicaURL);
urlConn.setUsable(false);
continue; // next urlConn
}
if (replicaTs.before(lastTs)) {
// not usable yet
continue; // next urlConn
}
try {
sql = DB.getDatabase().convertStatement(sql);
statementToReturn = conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
usedReplicaURL = urlConn.getReplicaURL();
} catch (SQLException e) {
log.warning("Error preparing statement in replica -> " + replicaURL + " / SQL = " + sql);
statementToReturn = null;
urlConn.setUsable(false);
}
} // end for urlConnList
if (statementToReturn != null)
break; // break iterations
boolean noMoreUsables = true;
for (URLReplicaConnection urlConn : urlConnList) {
if (urlConn.isUsable()) {
noMoreUsables = false;
break;
}
}
if (noMoreUsables)
break;
if (i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS-1) {
try {
log.warning("Waiting " + DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS + " milliseconds for replication to sync");
Thread.sleep(DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS);
} catch (InterruptedException e) {
;
}
}
} // end for iterations
if (statementToReturn == null)
log.warning("Abandoning replicas: none usable or max wait reached without sync");
// close any connection not used in the statement to return
for (URLReplicaConnection urlConn : urlConnList) {
Connection conn = urlConn.getConnection();
if (conn == null)
continue;
try {
if (statementToReturn != null && conn == statementToReturn.getConnection())
continue; // next connection
} catch (SQLException e) {
log.warning("Error getting connection from statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage());
}
try {
conn.close();
} catch (SQLException e) {
log.warning("Could not close connection statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage());
}
}
if (statementToReturn != null)
log.warning("Using replica for statement, URL = " + usedReplicaURL + " / SQL = " + sql); // WARNING during test phase, change to info before moving to production
return statementToReturn;
}
private static Timestamp getReplicaVerificationTimestamp(Connection conn) {
PreparedStatement stVerifySync = null;
ResultSet rsVerifySync = null;
Timestamp replicaTs = null;
try {
stVerifySync = conn.prepareStatement(sqlValidateSync);
rsVerifySync = stVerifySync.executeQuery();
if (rsVerifySync.next())
replicaTs = rsVerifySync.getTimestamp("lastupdate");
} catch (SQLException e) {
replicaTs = null;
} finally {
DB.close(rsVerifySync, stVerifySync);
}
return replicaTs;
}
private static String getReplicaDBAddress(Connection conn) {
PreparedStatement stVerifyAddr = null;
ResultSet rsVerifyAddr = null;
String dbAddr = null;
try {
stVerifyAddr = conn.prepareStatement(sqlValidateDBAddress);
rsVerifyAddr = stVerifyAddr.executeQuery();
if (rsVerifyAddr.next())
dbAddr = rsVerifyAddr.getString("DBAddress");
} catch (SQLException e) {
dbAddr = null;
} finally {
DB.close(rsVerifyAddr, stVerifyAddr);
}
return dbAddr;
}
private static Connection tryConnect(String replicaURL) {
// open connection to replica
Connection conn = null;
try {
conn = DB.getDatabase(replicaURL).getDriverConnection(replicaURL, m_user, m_pass);
} catch (SQLException e) {
log.warning("Could not get a connection to " + replicaURL + ", cause = " + e.getLocalizedMessage());
conn = null;
}
return conn;
}
private static Timestamp setMasterVerificationTimestamp() {
// update time in master database to verify synchronization of replicas
Timestamp lastTs = null;
try {
DB.executeUpdateEx(sqlUpdateSync, null);
lastTs = DB.getSQLValueTSEx(null, sqlValidateSync);
} catch (DBException e1) {
log.warning("Could not sync dbreplicasyncverifier, cause = " + e1.getLocalizedMessage());
lastTs = null;
}
return lastTs;
}
public static void closeReadReplicaStatement(Statement st) {
// close the connection associated to the statement
try {
st.getConnection().close();
} catch (SQLException e) {
log.warning("Error closing the read replica statement, cause = " + e.getLocalizedMessage());
}
}
/**
* Set Attributes from String (pares toStringLong())
* @param attributes attributes
*/
private static void setUserPass() {
if (m_user != null || m_pass != null)
return;
String attributes = Ini.getProperty (Ini.P_CONNECTION);
try {
attributes = attributes.substring(attributes.indexOf("[")+1, attributes.length() - 1);
String[] pairs= attributes.split("[,]");
for(String pair : pairs)
{
String[] pairComponents = pair.split("[=]");
String key = pairComponents[0];
String value = pairComponents.length == 2 ? unescape(pairComponents[1]) : "";
if ("UID".equalsIgnoreCase(key)) {
m_user = value;
} else if ("PWD".equalsIgnoreCase(key)) {
m_pass = value;
}
}
} catch (Exception e) {
log.log(Level.SEVERE, attributes + " - " + e.toString (), e);
}
} // setAttributes
private static String unescape(String value) {
value = value.replace("&eq;", "=");
value = value.replace("&comma;", ",");
return value;
}
/**
* URL connection class for DB Read Replica
*
*/
private static class URLReplicaConnection implements Comparable<URLReplicaConnection> {
private String replicaURL;
private int index;
private boolean usable = true;
private Connection conn = null;
public URLReplicaConnection(String replicaURL, int index) {
this.replicaURL = replicaURL;
this.index = index;
}
public String getReplicaURL() {
return replicaURL;
}
public boolean isUsable() {
return usable;
}
public void setUsable(boolean usable) {
this.usable = usable;
}
public Connection getConnection() {
return conn;
}
public void setConnection(Connection conn) {
this.conn = conn;
}
@Override
public int compareTo(URLReplicaConnection o) {
if (index > o.index)
return 1;
else if (index < o.index)
return -1;
return 0;
}
@Override
public String toString() {
return "replicaURL=" + replicaURL
+ " index=" + index
+ " usable=" + usable
+ " connection=" + conn;
}
}
}