IDEMPIERE-5732 Cache reset should happens after commit of transaction (#1850)

* IDEMPIERE-5732 Cache reset should happens after commit of transaction

* IDEMPIERE-5732 Cache reset should happens after commit of transaction

* - minor defensive programming

---------

Co-authored-by: hengsin <hengsin@gmail.com>
This commit is contained in:
Carlos Ruiz 2023-08-01 13:53:11 +02:00 committed by GitHub
parent f919931eaf
commit 159fbbd04c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 50 deletions

View File

@ -109,7 +109,10 @@ public final class Adempiere
private static CLogger log = null; private static CLogger log = null;
/** Thread pool **/ /** Thread pool **/
private static ScheduledThreadPoolExecutor threadPoolExecutor = null; private final static ScheduledThreadPoolExecutor threadPoolExecutor = createThreadPool();
static {
Trx.startTrxMonitor();
}
/** A list of event listeners for this component. */ /** A list of event listeners for this component. */
private static EventListenerList m_listenerList = new EventListenerList(); private static EventListenerList m_listenerList = new EventListenerList();
@ -584,8 +587,7 @@ public final class Adempiere
} }
} }
private static synchronized void createThreadPool() { private static ScheduledThreadPoolExecutor createThreadPool() {
if (threadPoolExecutor == null) {
int max = Runtime.getRuntime().availableProcessors() * 20; int max = Runtime.getRuntime().availableProcessors() * 20;
int defaultMax = max; int defaultMax = max;
Properties properties = Ini.getProperties(); Properties properties = Ini.getProperties();
@ -600,10 +602,7 @@ public final class Adempiere
} }
// start thread pool // start thread pool
threadPoolExecutor = new ScheduledThreadPoolExecutor(max); return new ScheduledThreadPoolExecutor(max);
Trx.startTrxMonitor();
}
} }
/** /**
@ -690,10 +689,7 @@ public final class Adempiere
} }
public static synchronized void stop() { public static synchronized void stop() {
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown(); threadPoolExecutor.shutdown();
threadPoolExecutor = null;
}
log = null; log = null;
} }
@ -701,9 +697,7 @@ public final class Adempiere
* *
* @return {@link ScheduledThreadPoolExecutor} * @return {@link ScheduledThreadPoolExecutor}
*/ */
public static synchronized ScheduledThreadPoolExecutor getThreadPoolExecutor() { public static ScheduledThreadPoolExecutor getThreadPoolExecutor() {
if (threadPoolExecutor == null)
createThreadPool();
return threadPoolExecutor; return threadPoolExecutor;
} }

View File

@ -71,6 +71,7 @@ import org.compiere.util.Msg;
import org.compiere.util.SecureEngine; import org.compiere.util.SecureEngine;
import org.compiere.util.Trace; import org.compiere.util.Trace;
import org.compiere.util.Trx; import org.compiere.util.Trx;
import org.compiere.util.TrxEventListener;
import org.compiere.util.Util; import org.compiere.util.Util;
import org.compiere.util.ValueNamePair; import org.compiere.util.ValueNamePair;
import org.osgi.service.event.Event; import org.osgi.service.event.Event;
@ -2692,10 +2693,37 @@ public abstract class PO
if (!newRecord) if (!newRecord)
MRecentItem.clearLabel(p_info.getAD_Table_ID(), get_UUID()); MRecentItem.clearLabel(p_info.getAD_Table_ID(), get_UUID());
if (CacheMgt.get().hasCache(p_info.getTableName())) { if (CacheMgt.get().hasCache(p_info.getTableName())) {
boolean cacheResetScheduled = false;
if (get_TrxName() != null) {
Trx trx = Trx.get(get_TrxName(), false);
if (trx != null) {
trx.addTrxEventListener(new TrxEventListener() {
@Override
public void afterRollback(Trx trx, boolean success) {
trx.removeTrxEventListener(this);
}
@Override
public void afterCommit(Trx sav, boolean success) {
if (success)
if (!newRecord) if (!newRecord)
Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), get_ID())); Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), get_ID()));
else if (get_ID() > 0 && success) else if (get_ID() > 0)
Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().newRecord(p_info.getTableName(), get_ID())); Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().newRecord(p_info.getTableName(), get_ID()));
trx.removeTrxEventListener(this);
}
@Override
public void afterClose(Trx trx) {
}
});
cacheResetScheduled = true;
}
}
if (!cacheResetScheduled) {
if (!newRecord)
Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), get_ID()));
else if (get_ID() > 0)
Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().newRecord(p_info.getTableName(), get_ID()));
}
} }
return success; return success;
@ -4169,6 +4197,26 @@ public abstract class PO
} }
else else
{ {
if (CacheMgt.get().hasCache(p_info.getTableName())) {
Trx trxdel = Trx.get(get_TrxName(), false);
if (trxdel != null) {
trxdel.addTrxEventListener(new TrxEventListener() {
@Override
public void afterRollback(Trx trxdel, boolean success) {
trxdel.removeTrxEventListener(this);
}
@Override
public void afterCommit(Trx trxdel, boolean success) {
if (success)
Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), Record_ID));
trxdel.removeTrxEventListener(this);
}
@Override
public void afterClose(Trx trxdel) {
}
});
}
}
if (localTrx != null) if (localTrx != null)
{ {
try { try {
@ -4196,7 +4244,6 @@ public abstract class PO
int size = p_info.getColumnCount(); int size = p_info.getColumnCount();
m_oldValues = new Object[size]; m_oldValues = new Object[size];
m_newValues = new Object[size]; m_newValues = new Object[size];
Adempiere.getThreadPoolExecutor().submit(() -> CacheMgt.get().reset(p_info.getTableName(), Record_ID));
} }
} }
finally finally

View File

@ -21,14 +21,13 @@ import java.io.StringWriter;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Savepoint; import java.sql.Savepoint;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
@ -108,7 +107,7 @@ public class Trx
private static final Trx.TrxMonitor s_monitor = new Trx.TrxMonitor(); private static final Trx.TrxMonitor s_monitor = new Trx.TrxMonitor();
private List<TrxEventListener> listeners = new ArrayList<TrxEventListener>(); private ConcurrentLinkedQueue<TrxEventListener> listeners = new ConcurrentLinkedQueue<TrxEventListener>();
protected Exception trace; protected Exception trace;
@ -347,8 +346,7 @@ public class Trx
} // rollback } // rollback
private void fireAfterRollbackEvent(boolean success) { private void fireAfterRollbackEvent(boolean success) {
TrxEventListener[] copies = listeners.toArray(new TrxEventListener[0]); for(TrxEventListener l : listeners) {
for(TrxEventListener l : copies) {
l.afterRollback(this, success); l.afterRollback(this, success);
} }
} }
@ -433,8 +431,7 @@ public class Trx
} // commit } // commit
private void fireAfterCommitEvent(boolean success) { private void fireAfterCommitEvent(boolean success) {
TrxEventListener[] copies = listeners.toArray(new TrxEventListener[0]); for(TrxEventListener l : listeners) {
for(TrxEventListener l : copies) {
l.afterCommit(this, success); l.afterCommit(this, success);
} }
} }
@ -541,8 +538,7 @@ public class Trx
} // close } // close
private void fireAfterCloseEvent() { private void fireAfterCloseEvent() {
TrxEventListener[] copies = listeners.toArray(new TrxEventListener[0]); for(TrxEventListener l : listeners) {
for(TrxEventListener l : copies) {
l.afterClose(this); l.afterClose(this);
} }
} }
@ -735,16 +731,12 @@ public class Trx
* @param listener * @param listener
*/ */
public void addTrxEventListener(TrxEventListener listener) { public void addTrxEventListener(TrxEventListener listener) {
synchronized (listeners) {
listeners.add(listener); listeners.add(listener);
} }
}
public boolean removeTrxEventListener(TrxEventListener listener) { public boolean removeTrxEventListener(TrxEventListener listener) {
synchronized (listeners) {
return listeners.remove(listener); return listeners.remove(listener);
} }
}
public String getStrackTrace() public String getStrackTrace()
{ {

View File

@ -97,9 +97,10 @@ public class MatchInvTestIsolated extends AbstractTestCase {
categoryAcct.saveEx(); categoryAcct.saveEx();
} }
try {
int mulchId = DictionaryIDs.M_Product.MULCH.id; // Mulch product int mulchId = DictionaryIDs.M_Product.MULCH.id; // Mulch product
MProduct mulch = new MProduct(Env.getCtx(), mulchId, getTrxName()); MProduct mulch = new MProduct(Env.getCtx(), mulchId, null);
int mulchCategoryId = mulch.getM_Product_Category_ID();
try {
mulch.setM_Product_Category_ID(category.get_ID()); mulch.setM_Product_Category_ID(category.get_ID());
mulch.saveEx(); mulch.saveEx();
@ -257,6 +258,8 @@ public class MatchInvTestIsolated extends AbstractTestCase {
} }
} finally { } finally {
getTrx().rollback(); getTrx().rollback();
mulch.setM_Product_Category_ID(mulchCategoryId);
mulch.saveEx();
category.deleteEx(true); category.deleteEx(true);
} }
} }

View File

@ -213,9 +213,11 @@ public class CacheTest extends AbstractTestCase {
assertEquals(oak, p2.getM_Product_ID()); assertEquals(oak, p2.getM_Product_ID());
assertTrue(pc.getHit() > hit, "Second get of product Oak, cache hit should increase"); assertTrue(pc.getHit() > hit, "Second get of product Oak, cache hit should increase");
String oakDescription = p2.getDescription();
p2 = new MProduct(Env.getCtx(), p2, getTrxName()); p2 = new MProduct(Env.getCtx(), p2, getTrxName());
p2.setDescription("Test Update @ " + System.currentTimeMillis()); p2.setDescription("Test Update @ " + System.currentTimeMillis());
p2.saveEx(); p2.saveEx();
commit();
//get after p2 update, miss should increase //get after p2 update, miss should increase
//wait 500ms since cache reset after update is async //wait 500ms since cache reset after update is async
@ -242,6 +244,7 @@ public class CacheTest extends AbstractTestCase {
p3.saveEx(); p3.saveEx();
p3.deleteEx(true); p3.deleteEx(true);
commit();
//cache for p2 not effected by p3 delete, hit should increase //cache for p2 not effected by p3 delete, hit should increase
hit = pc.getHit(); hit = pc.getHit();
@ -256,6 +259,11 @@ public class CacheTest extends AbstractTestCase {
p2.saveEx(); p2.saveEx();
rollback(); rollback();
//revert description update
p2 = new MProduct(Env.getCtx(), oak, null);
p2.setDescription(oakDescription);
p2.saveEx();
} }
@Test @Test