From 159fbbd04c0879bfa0aab8e0f4e1417871d015ed Mon Sep 17 00:00:00 2001 From: Carlos Ruiz Date: Tue, 1 Aug 2023 13:53:11 +0200 Subject: [PATCH] IDEMPIERE-5732 Cache reset should happens after commit of transaction (#1850) * IDEMPIERE-5732 Cache reset should happens after commit of transaction * IDEMPIERE-5732 Cache reset should happens after commit of transaction * - minor defensive programming --------- Co-authored-by: hengsin --- .../src/org/compiere/Adempiere.java | 48 +++++++--------- .../src/org/compiere/model/PO.java | 57 +++++++++++++++++-- .../src/org/compiere/util/Trx.java | 22 +++---- .../test/base/MatchInvTestIsolated.java | 9 ++- .../idempiere/test/performance/CacheTest.java | 8 +++ 5 files changed, 94 insertions(+), 50 deletions(-) diff --git a/org.adempiere.base/src/org/compiere/Adempiere.java b/org.adempiere.base/src/org/compiere/Adempiere.java index 7c5f5b3274..9f5a0121e6 100644 --- a/org.adempiere.base/src/org/compiere/Adempiere.java +++ b/org.adempiere.base/src/org/compiere/Adempiere.java @@ -109,7 +109,10 @@ public final class Adempiere private static CLogger log = null; /** Thread pool **/ - private static ScheduledThreadPoolExecutor threadPoolExecutor = null; + private final static ScheduledThreadPoolExecutor threadPoolExecutor = createThreadPool(); + static { + Trx.startTrxMonitor(); + } /** A list of event listeners for this component. */ private static EventListenerList m_listenerList = new EventListenerList(); @@ -584,26 +587,22 @@ public final class Adempiere } } - private static synchronized void createThreadPool() { - if (threadPoolExecutor == null) { - int max = Runtime.getRuntime().availableProcessors() * 20; - int defaultMax = max; - Properties properties = Ini.getProperties(); - String maxSize = properties.getProperty("MaxThreadPoolSize"); - if (maxSize != null) { - try { - max = Integer.parseInt(maxSize); - } catch (Exception e) {} - } - if (max <= 0) { - max = defaultMax; - } - - // start thread pool - threadPoolExecutor = new ScheduledThreadPoolExecutor(max); - - Trx.startTrxMonitor(); + private static ScheduledThreadPoolExecutor createThreadPool() { + int max = Runtime.getRuntime().availableProcessors() * 20; + int defaultMax = max; + Properties properties = Ini.getProperties(); + String maxSize = properties.getProperty("MaxThreadPoolSize"); + if (maxSize != null) { + try { + max = Integer.parseInt(maxSize); + } catch (Exception e) {} } + if (max <= 0) { + max = defaultMax; + } + + // start thread pool + return new ScheduledThreadPoolExecutor(max); } /** @@ -690,10 +689,7 @@ public final class Adempiere } public static synchronized void stop() { - if (threadPoolExecutor != null) { - threadPoolExecutor.shutdown(); - threadPoolExecutor = null; - } + threadPoolExecutor.shutdown(); log = null; } @@ -701,9 +697,7 @@ public final class Adempiere * * @return {@link ScheduledThreadPoolExecutor} */ - public static synchronized ScheduledThreadPoolExecutor getThreadPoolExecutor() { - if (threadPoolExecutor == null) - createThreadPool(); + public static ScheduledThreadPoolExecutor getThreadPoolExecutor() { return threadPoolExecutor; } diff --git a/org.adempiere.base/src/org/compiere/model/PO.java b/org.adempiere.base/src/org/compiere/model/PO.java index 45d29e4f94..c1fa383598 100644 --- a/org.adempiere.base/src/org/compiere/model/PO.java +++ b/org.adempiere.base/src/org/compiere/model/PO.java @@ -71,6 +71,7 @@ import org.compiere.util.Msg; import org.compiere.util.SecureEngine; import org.compiere.util.Trace; import org.compiere.util.Trx; +import org.compiere.util.TrxEventListener; import org.compiere.util.Util; import org.compiere.util.ValueNamePair; import org.osgi.service.event.Event; @@ -2692,10 +2693,37 @@ public abstract class PO if (!newRecord) MRecentItem.clearLabel(p_info.getAD_Table_ID(), get_UUID()); if (CacheMgt.get().hasCache(p_info.getTableName())) { - if (!newRecord) - Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), get_ID())); - else if (get_ID() > 0 && success) - Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().newRecord(p_info.getTableName(), get_ID())); + boolean cacheResetScheduled = false; + if (get_TrxName() != null) { + Trx trx = Trx.get(get_TrxName(), false); + if (trx != null) { + trx.addTrxEventListener(new TrxEventListener() { + @Override + public void afterRollback(Trx trx, boolean success) { + trx.removeTrxEventListener(this); + } + @Override + public void afterCommit(Trx sav, boolean success) { + if (success) + if (!newRecord) + Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), get_ID())); + else if (get_ID() > 0) + Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().newRecord(p_info.getTableName(), get_ID())); + trx.removeTrxEventListener(this); + } + @Override + public void afterClose(Trx trx) { + } + }); + cacheResetScheduled = true; + } + } + if (!cacheResetScheduled) { + if (!newRecord) + Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), get_ID())); + else if (get_ID() > 0) + Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().newRecord(p_info.getTableName(), get_ID())); + } } return success; @@ -4169,6 +4197,26 @@ public abstract class PO } else { + if (CacheMgt.get().hasCache(p_info.getTableName())) { + Trx trxdel = Trx.get(get_TrxName(), false); + if (trxdel != null) { + trxdel.addTrxEventListener(new TrxEventListener() { + @Override + public void afterRollback(Trx trxdel, boolean success) { + trxdel.removeTrxEventListener(this); + } + @Override + public void afterCommit(Trx trxdel, boolean success) { + if (success) + Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), Record_ID)); + trxdel.removeTrxEventListener(this); + } + @Override + public void afterClose(Trx trxdel) { + } + }); + } + } if (localTrx != null) { try { @@ -4196,7 +4244,6 @@ public abstract class PO int size = p_info.getColumnCount(); m_oldValues = new Object[size]; m_newValues = new Object[size]; - Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), Record_ID)); } } finally diff --git a/org.adempiere.base/src/org/compiere/util/Trx.java b/org.adempiere.base/src/org/compiere/util/Trx.java index 1ecdecec8a..cbe40526a5 100644 --- a/org.adempiere.base/src/org/compiere/util/Trx.java +++ b/org.adempiere.base/src/org/compiere/util/Trx.java @@ -21,14 +21,13 @@ import java.io.StringWriter; import java.sql.Connection; import java.sql.SQLException; import java.sql.Savepoint; -import java.util.ArrayList; import java.util.Collection; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -108,7 +107,7 @@ public class Trx private static final Trx.TrxMonitor s_monitor = new Trx.TrxMonitor(); - private List listeners = new ArrayList(); + private ConcurrentLinkedQueue listeners = new ConcurrentLinkedQueue(); protected Exception trace; @@ -347,8 +346,7 @@ public class Trx } // rollback private void fireAfterRollbackEvent(boolean success) { - TrxEventListener[] copies = listeners.toArray(new TrxEventListener[0]); - for(TrxEventListener l : copies) { + for(TrxEventListener l : listeners) { l.afterRollback(this, success); } } @@ -433,8 +431,7 @@ public class Trx } // commit private void fireAfterCommitEvent(boolean success) { - TrxEventListener[] copies = listeners.toArray(new TrxEventListener[0]); - for(TrxEventListener l : copies) { + for(TrxEventListener l : listeners) { l.afterCommit(this, success); } } @@ -541,8 +538,7 @@ public class Trx } // close private void fireAfterCloseEvent() { - TrxEventListener[] copies = listeners.toArray(new TrxEventListener[0]); - for(TrxEventListener l : copies) { + for(TrxEventListener l : listeners) { l.afterClose(this); } } @@ -735,15 +731,11 @@ public class Trx * @param listener */ public void addTrxEventListener(TrxEventListener listener) { - synchronized (listeners) { - listeners.add(listener); - } + listeners.add(listener); } public boolean removeTrxEventListener(TrxEventListener listener) { - synchronized (listeners) { - return listeners.remove(listener); - } + return listeners.remove(listener); } public String getStrackTrace() diff --git a/org.idempiere.test/src/org/idempiere/test/base/MatchInvTestIsolated.java b/org.idempiere.test/src/org/idempiere/test/base/MatchInvTestIsolated.java index e0934d0e33..b7c59daa34 100644 --- a/org.idempiere.test/src/org/idempiere/test/base/MatchInvTestIsolated.java +++ b/org.idempiere.test/src/org/idempiere/test/base/MatchInvTestIsolated.java @@ -97,9 +97,10 @@ public class MatchInvTestIsolated extends AbstractTestCase { categoryAcct.saveEx(); } - try { - int mulchId = DictionaryIDs.M_Product.MULCH.id; // Mulch product - MProduct mulch = new MProduct(Env.getCtx(), mulchId, getTrxName()); + int mulchId = DictionaryIDs.M_Product.MULCH.id; // Mulch product + MProduct mulch = new MProduct(Env.getCtx(), mulchId, null); + int mulchCategoryId = mulch.getM_Product_Category_ID(); + try { mulch.setM_Product_Category_ID(category.get_ID()); mulch.saveEx(); @@ -257,6 +258,8 @@ public class MatchInvTestIsolated extends AbstractTestCase { } } finally { getTrx().rollback(); + mulch.setM_Product_Category_ID(mulchCategoryId); + mulch.saveEx(); category.deleteEx(true); } } diff --git a/org.idempiere.test/src/org/idempiere/test/performance/CacheTest.java b/org.idempiere.test/src/org/idempiere/test/performance/CacheTest.java index 4877ba758a..e2265de736 100644 --- a/org.idempiere.test/src/org/idempiere/test/performance/CacheTest.java +++ b/org.idempiere.test/src/org/idempiere/test/performance/CacheTest.java @@ -213,9 +213,11 @@ public class CacheTest extends AbstractTestCase { assertEquals(oak, p2.getM_Product_ID()); assertTrue(pc.getHit() > hit, "Second get of product Oak, cache hit should increase"); + String oakDescription = p2.getDescription(); p2 = new MProduct(Env.getCtx(), p2, getTrxName()); p2.setDescription("Test Update @ " + System.currentTimeMillis()); p2.saveEx(); + commit(); //get after p2 update, miss should increase //wait 500ms since cache reset after update is async @@ -242,6 +244,7 @@ public class CacheTest extends AbstractTestCase { p3.saveEx(); p3.deleteEx(true); + commit(); //cache for p2 not effected by p3 delete, hit should increase hit = pc.getHit(); @@ -256,6 +259,11 @@ public class CacheTest extends AbstractTestCase { p2.saveEx(); rollback(); + + //revert description update + p2 = new MProduct(Env.getCtx(), oak, null); + p2.setDescription(oakDescription); + p2.saveEx(); } @Test