From 1fd56b7609b853610d19ec55c2e35f82daef374a Mon Sep 17 00:00:00 2001 From: Carlos Ruiz Date: Tue, 18 Dec 2018 14:22:17 +0100 Subject: [PATCH] IDEMPIERE-3850 Performance improvement: implement reporting from read-only replica (from TrekGlobal) --- .../i6.1z/oracle/201708031855_1008281.sql | 9 + .../i6.1z/postgresql/201708031855_1008281.sql | 13 + .../src/org/compiere/db/StatementProxy.java | 2 + .../src/org/compiere/model/MSysConfig.java | 5 +- .../src/org/compiere/print/DataEngine.java | 2 +- .../src/org/compiere/util/DB.java | 54 +++ .../src/org/compiere/util/DBReadReplica.java | 349 ++++++++++++++++++ 7 files changed, 432 insertions(+), 2 deletions(-) create mode 100644 migration/i6.1z/oracle/201708031855_1008281.sql create mode 100644 migration/i6.1z/postgresql/201708031855_1008281.sql create mode 100644 org.adempiere.base/src/org/compiere/util/DBReadReplica.java diff --git a/migration/i6.1z/oracle/201708031855_1008281.sql b/migration/i6.1z/oracle/201708031855_1008281.sql new file mode 100644 index 0000000000..cdfcb9715b --- /dev/null +++ b/migration/i6.1z/oracle/201708031855_1008281.sql @@ -0,0 +1,9 @@ +-- 1008281 - Implement reporting from read-only replica + +CREATE TABLE dbreplicasyncverifier (lastupdate date); + +INSERT INTO dbreplicasyncverifier values (to_date('1900-01-01 00:00:00', 'yyyy-mm-dd HH24:MI:SS')); + +SELECT register_migration_script('201708031855_1008281.sql') FROM dual +; + diff --git a/migration/i6.1z/postgresql/201708031855_1008281.sql b/migration/i6.1z/postgresql/201708031855_1008281.sql new file mode 100644 index 0000000000..32a2a09307 --- /dev/null +++ b/migration/i6.1z/postgresql/201708031855_1008281.sql @@ -0,0 +1,13 @@ +-- 1008281 - Implement reporting from read-only replica + +CREATE TABLE dbreplicasyncverifier (lastupdate timestamp); + +INSERT INTO dbreplicasyncverifier values (to_timestamp('1900-01-01 00:00:00', 'yyyy-mm-dd HH24:MI:SS')); + +CREATE OR REPLACE RULE insert_dual AS ON INSERT TO dual DO INSTEAD NOTHING; + +CREATE OR REPLACE RULE delete_dual AS ON DELETE TO dual DO INSTEAD NOTHING; + +SELECT register_migration_script('201708031855_1008281.sql') FROM dual +; + diff --git a/org.adempiere.base/src/org/compiere/db/StatementProxy.java b/org.adempiere.base/src/org/compiere/db/StatementProxy.java index ee592f0348..97e4b3b308 100644 --- a/org.adempiere.base/src/org/compiere/db/StatementProxy.java +++ b/org.adempiere.base/src/org/compiere/db/StatementProxy.java @@ -95,6 +95,8 @@ public class StatementProxy implements InvocationHandler { return null; } else if (name.equals("getSql") && (args == null || args.length == 0)) { return getSql(); + } else if (name.equals("equals") && (args != null && args.length == 1)) { + return equals(args[0]); } String logSql = null; diff --git a/org.adempiere.base/src/org/compiere/model/MSysConfig.java b/org.adempiere.base/src/org/compiere/model/MSysConfig.java index 712c77ed8c..f515a6e80e 100644 --- a/org.adempiere.base/src/org/compiere/model/MSysConfig.java +++ b/org.adempiere.base/src/org/compiere/model/MSysConfig.java @@ -42,7 +42,7 @@ public class MSysConfig extends X_AD_SysConfig /** * */ - private static final long serialVersionUID = -1273442365045945366L; + private static final long serialVersionUID = 2856526441538434702L; public static final String ADDRESS_VALIDATION = "ADDRESS_VALIDATION"; public static final String ALERT_SEND_ATTACHMENT_AS_XLS = "ALERT_SEND_ATTACHMENT_AS_XLS"; @@ -80,6 +80,9 @@ public class MSysConfig extends X_AD_SysConfig public static final String CLIENT_ACCOUNTING = "CLIENT_ACCOUNTING"; public static final String DEFAULT_COA_PATH = "DEFAULT_COA_PATH"; public static final String DEFAULT_ENTITYTYPE = "DEFAULT_ENTITYTYPE"; // used as default in entity type columns with get_sysconfig + public static final String DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = "DB_READ_REPLICA_NORMAL_MAX_ITERATIONS"; + public static final String DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = "DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS"; + public static final String DB_READ_REPLICA_URLS = "DB_READ_REPLICA_URLS"; public static final String DICTIONARY_ID_COMMENTS = "DICTIONARY_ID_COMMENTS"; public static final String DICTIONARY_ID_PASSWORD = "DICTIONARY_ID_PASSWORD"; public static final String DICTIONARY_ID_USE_CENTRALIZED_ID = "DICTIONARY_ID_USE_CENTRALIZED_ID"; diff --git a/org.adempiere.base/src/org/compiere/print/DataEngine.java b/org.adempiere.base/src/org/compiere/print/DataEngine.java index e65a67f1a6..082ab78689 100644 --- a/org.adempiere.base/src/org/compiere/print/DataEngine.java +++ b/org.adempiere.base/src/org/compiere/print/DataEngine.java @@ -830,7 +830,7 @@ public class DataEngine ResultSet rs = null; try { - pstmt = DB.prepareStatement(pd.getSQL(), m_trxName); + pstmt = DB.prepareNormalReadReplicaStatement(pd.getSQL(), m_trxName); rs = pstmt.executeQuery(); // Row Loop while (rs.next()) diff --git a/org.adempiere.base/src/org/compiere/util/DB.java b/org.adempiere.base/src/org/compiere/util/DB.java index 41065426a4..072d6696d2 100644 --- a/org.adempiere.base/src/org/compiere/util/DB.java +++ b/org.adempiere.base/src/org/compiere/util/DB.java @@ -33,6 +33,7 @@ import java.sql.Timestamp; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.logging.Level; @@ -2114,6 +2115,15 @@ public final class DB } catch (SQLException e) { ; } + if (readReplicaStatements.contains(st)) { + try { + DBReadReplica.closeReadReplicaStatement(st); + } catch (Exception e) { + ; + } finally { + readReplicaStatements.remove(st); + } + } } /** @@ -2524,4 +2534,48 @@ public final class DB return rowsArray; } + /** Read Replica Statements List */ + private static final List readReplicaStatements = Collections.synchronizedList(new ArrayList()); + + /** + * Prepare Read Replica Statement + * @param sql sql statement + * @param trxName transaction + * @return Prepared Statement (from replica if possible, otherwise normal statement) + */ + public static PreparedStatement prepareNormalReadReplicaStatement(String sql, String trxName) { + int concurrency = ResultSet.CONCUR_READ_ONLY; + String upper = sql.toUpperCase(); + if (upper.startsWith("UPDATE ") || upper.startsWith("DELETE ")) + concurrency = ResultSet.CONCUR_UPDATABLE; + return prepareNormalReadReplicaStatement(sql, ResultSet.TYPE_FORWARD_ONLY, concurrency, trxName); + } + + /** + * Prepare Read Replica Statement + * @param sql sql statement + * @param resultSetType - ResultSet.TYPE_FORWARD_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.TYPE_SCROLL_SENSITIVE + * @param resultSetConcurrency - ResultSet.CONCUR_READ_ONLY or ResultSet.CONCUR_UPDATABLE + * @param trxName transaction name + * @return Prepared Statement (from replica if possible, otherwise normal statement) + */ + private static PreparedStatement prepareNormalReadReplicaStatement(String sql, int resultSetType, int resultSetConcurrency, String trxName) { + if (sql == null || sql.length() == 0) + throw new IllegalArgumentException("No SQL"); + boolean useReadReplica = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS) != null; + if ( trxName == null + && useReadReplica + && resultSetType == ResultSet.TYPE_FORWARD_ONLY + && resultSetConcurrency == ResultSet.CONCUR_READ_ONLY) { + // this is a candidate for a read replica connection (read-only, forward-only, no-trx), try to obtain one, otherwise fallback to normal + PreparedStatement stmt = DBReadReplica.prepareNormalReadReplicaStatement(sql, resultSetType, resultSetConcurrency, trxName); + if (stmt != null) { + readReplicaStatements.add(stmt); + return stmt; + } + } + // + return ProxyFactory.newCPreparedStatement(resultSetType, resultSetConcurrency, sql, trxName); + } + } // DB diff --git a/org.adempiere.base/src/org/compiere/util/DBReadReplica.java b/org.adempiere.base/src/org/compiere/util/DBReadReplica.java new file mode 100644 index 0000000000..35ab65f763 --- /dev/null +++ b/org.adempiere.base/src/org/compiere/util/DBReadReplica.java @@ -0,0 +1,349 @@ +/********************************************************************** +* Copyright (C) Contributors * +* * +* This program is free software; you can redistribute it and/or * +* modify it under the terms of the GNU General Public License * +* as published by the Free Software Foundation; either version 2 * +* of the License, or (at your option) any later version. * +* * +* This program is distributed in the hope that it will be useful, * +* but WITHOUT ANY WARRANTY; without even the implied warranty of * +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * +* GNU General Public License for more details. * +* * +* You should have received a copy of the GNU General Public License * +* along with this program; if not, write to the Free Software * +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, * +* MA 02110-1301, USA. * +* * +* Contributors: * +* - Trek Global * +* - Carlos Ruiz - globalqss * +**********************************************************************/ +package org.compiere.util; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; + +import org.adempiere.exceptions.DBException; +import org.compiere.model.MSysConfig; +import org.compiere.model.MSystem; + +public class DBReadReplica { + + private static String m_user = null; + private static String m_pass = null; + final private static String sqlValidateSync = "SELECT lastupdate FROM dbreplicasyncverifier"; + final private static String sqlUpdateSync = "UPDATE dbreplicasyncverifier SET lastupdate=SYSDATE"; + final private static String sqlValidateDBAddress = "SELECT DBAddress FROM AD_System"; + private volatile static int shift = 0; // for load balancing between different replicas + + /** Logger */ + private static CLogger log = CLogger.getCLogger (DBReadReplica.class); + + /** + * Prepare Normal Read Replica Statement + * @param sql sql statement + * @param resultSetType - ResultSet.TYPE_FORWARD_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.TYPE_SCROLL_SENSITIVE + * @param resultSetConcurrency - ResultSet.CONCUR_READ_ONLY or ResultSet.CONCUR_UPDATABLE + * @param trxName transaction + * @return Prepared Statement (from replica if possible, otherwise null) + * + * @TODO: Add profiles fast and slow for other read replica statements + * + */ + public static PreparedStatement prepareNormalReadReplicaStatement(String sql, int resultSetType, int resultSetConcurrency, String trxName) { + String replicaURLsConfig = MSysConfig.getValue(MSysConfig.DB_READ_REPLICA_URLS); // list of JDBC URLs separated by | + if (Util.isEmpty(replicaURLsConfig, true)) + return null; + int DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS, 5000); + int DB_READ_REPLICA_NORMAL_MAX_ITERATIONS = MSysConfig.getIntValue(MSysConfig.DB_READ_REPLICA_NORMAL_MAX_ITERATIONS, 3); + + setUserPass(); + + Timestamp lastTs = setMasterVerificationTimestamp(); + String masterDBAddress = MSystem.get(Env.getCtx()).getDBAddress(); + + List urlConnList = new ArrayList(); + + String[] replicaURLs = replicaURLsConfig.split("\\|"); + int index = 0; + int length = replicaURLs.length; + for (int i = 0; i < length; i++) { + String replicaURL = replicaURLs[i].trim(); + index = (i + shift) % length; + URLReplicaConnection urc = new URLReplicaConnection(replicaURL, index); + urlConnList.add(urc); + } + shift++; + if (shift >= length) + shift = 0; + + // sort URLs by index - for load balancing + Collections.sort(urlConnList); + + PreparedStatement statementToReturn = null; + String usedReplicaURL = null; + for (int i = 0; i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS; i++) { + + for (URLReplicaConnection urlConn : urlConnList) { + + if (! urlConn.isUsable()) + continue; // next urlConn + + String replicaURL = urlConn.getReplicaURL(); + + Connection conn = urlConn.getConnection(); + if (conn == null && urlConn.isUsable()) { + conn = tryConnect(replicaURL); + } + if (conn == null) { + urlConn.setUsable(false); + continue; // next urlConn + } + urlConn.setConnection(conn); + + String replicaDBAddress = getReplicaDBAddress(conn); + if (!masterDBAddress.equals(replicaDBAddress)) { + log.warning("Replica DB Address doesn't match with master DB Address -> " + replicaURL); + urlConn.setUsable(false); + continue; // next urlConn + } + + Timestamp replicaTs = getReplicaVerificationTimestamp(conn); + if (replicaTs == null) { + log.warning("Could not get replica verification timestamp -> " + replicaURL); + urlConn.setUsable(false); + continue; // next urlConn + } + + if (replicaTs.before(lastTs)) { + // not usable yet + continue; // next urlConn + } + try { + sql = DB.getDatabase().convertStatement(sql); + statementToReturn = conn.prepareStatement(sql, resultSetType, resultSetConcurrency); + usedReplicaURL = urlConn.getReplicaURL(); + } catch (SQLException e) { + log.warning("Error preparing statement in replica -> " + replicaURL + " / SQL = " + sql); + statementToReturn = null; + urlConn.setUsable(false); + } + } // end for urlConnList + + if (statementToReturn != null) + break; // break iterations + + boolean noMoreUsables = true; + for (URLReplicaConnection urlConn : urlConnList) { + if (urlConn.isUsable()) { + noMoreUsables = false; + break; + } + } + + if (noMoreUsables) + break; + + if (i < DB_READ_REPLICA_NORMAL_MAX_ITERATIONS-1) { + try { + log.warning("Waiting " + DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS + " milliseconds for replication to sync"); + Thread.sleep(DB_READ_REPLICA_NORMAL_TIMEOUT_IN_MILLISECONDS); + } catch (InterruptedException e) { + ; + } + } + } // end for iterations + + if (statementToReturn == null) + log.warning("Abandoning replicas: none usable or max wait reached without sync"); + + // close any connection not used in the statement to return + for (URLReplicaConnection urlConn : urlConnList) { + Connection conn = urlConn.getConnection(); + if (conn == null) + continue; + try { + if (statementToReturn != null && conn == statementToReturn.getConnection()) + continue; // next connection + } catch (SQLException e) { + log.warning("Error getting connection from statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage()); + } + try { + conn.close(); + } catch (SQLException e) { + log.warning("Could not close connection statement on replica, URL = " + urlConn.getReplicaURL() + ", cause = " + e.getLocalizedMessage()); + } + } + + if (statementToReturn != null) + log.warning("Using replica for statement, URL = " + usedReplicaURL + " / SQL = " + sql); // WARNING during test phase, change to info before moving to production + return statementToReturn; + } + + private static Timestamp getReplicaVerificationTimestamp(Connection conn) { + PreparedStatement stVerifySync = null; + ResultSet rsVerifySync = null; + Timestamp replicaTs = null; + try { + stVerifySync = conn.prepareStatement(sqlValidateSync); + rsVerifySync = stVerifySync.executeQuery(); + if (rsVerifySync.next()) + replicaTs = rsVerifySync.getTimestamp("lastupdate"); + } catch (SQLException e) { + replicaTs = null; + } finally { + DB.close(rsVerifySync, stVerifySync); + } + return replicaTs; + } + + private static String getReplicaDBAddress(Connection conn) { + PreparedStatement stVerifyAddr = null; + ResultSet rsVerifyAddr = null; + String dbAddr = null; + try { + stVerifyAddr = conn.prepareStatement(sqlValidateDBAddress); + rsVerifyAddr = stVerifyAddr.executeQuery(); + if (rsVerifyAddr.next()) + dbAddr = rsVerifyAddr.getString("DBAddress"); + } catch (SQLException e) { + dbAddr = null; + } finally { + DB.close(rsVerifyAddr, stVerifyAddr); + } + return dbAddr; + } + + private static Connection tryConnect(String replicaURL) { + // open connection to replica + Connection conn = null; + try { + conn = DB.getDatabase(replicaURL).getDriverConnection(replicaURL, m_user, m_pass); + } catch (SQLException e) { + log.warning("Could not get a connection to " + replicaURL + ", cause = " + e.getLocalizedMessage()); + conn = null; + } + return conn; + } + + private static Timestamp setMasterVerificationTimestamp() { + // update time in master database to verify synchronization of replicas + Timestamp lastTs = null; + try { + DB.executeUpdateEx(sqlUpdateSync, null); + lastTs = DB.getSQLValueTSEx(null, sqlValidateSync); + } catch (DBException e1) { + log.warning("Could not sync dbreplicasyncverifier, cause = " + e1.getLocalizedMessage()); + lastTs = null; + } + return lastTs; + } + + public static void closeReadReplicaStatement(Statement st) { + // close the connection associated to the statement + try { + st.getConnection().close(); + } catch (SQLException e) { + log.warning("Error closing the read replica statement, cause = " + e.getLocalizedMessage()); + } + } + + /** + * Set Attributes from String (pares toStringLong()) + * @param attributes attributes + */ + private static void setUserPass() { + if (m_user != null || m_pass != null) + return; + String attributes = Ini.getProperty (Ini.P_CONNECTION); + try { + attributes = attributes.substring(attributes.indexOf("[")+1, attributes.length() - 1); + String[] pairs= attributes.split("[,]"); + for(String pair : pairs) + { + String[] pairComponents = pair.split("[=]"); + String key = pairComponents[0]; + String value = pairComponents.length == 2 ? unescape(pairComponents[1]) : ""; + if ("UID".equalsIgnoreCase(key)) { + m_user = value; + } else if ("PWD".equalsIgnoreCase(key)) { + m_pass = value; + } + } + } catch (Exception e) { + log.log(Level.SEVERE, attributes + " - " + e.toString (), e); + } + } // setAttributes + + private static String unescape(String value) { + value = value.replace("&eq;", "="); + value = value.replace(",", ","); + return value; + } + + /** + * URL connection class for DB Read Replica + * + */ + private static class URLReplicaConnection implements Comparable { + + private String replicaURL; + private int index; + private boolean usable = true; + private Connection conn = null; + + public URLReplicaConnection(String replicaURL, int index) { + this.replicaURL = replicaURL; + this.index = index; + } + + public String getReplicaURL() { + return replicaURL; + } + + public boolean isUsable() { + return usable; + } + + public void setUsable(boolean usable) { + this.usable = usable; + } + + public Connection getConnection() { + return conn; + } + + public void setConnection(Connection conn) { + this.conn = conn; + } + + @Override + public int compareTo(URLReplicaConnection o) { + if (index > o.index) + return 1; + else if (index < o.index) + return -1; + return 0; + } + + @Override + public String toString() { + return "replicaURL=" + replicaURL + + " index=" + index + + " usable=" + usable + + " connection=" + conn; + } + + } + +}