1008086 IDEMPIERE-2771 implement semaphore in internal 2packs to avoid multiple servers applying same 2pack

This commit is contained in:
Diego Ruiz 2017-06-21 01:24:25 +02:00
parent 005419e6b3
commit e012b916e2
9 changed files with 200 additions and 67 deletions

View File

@ -0,0 +1,17 @@
SET SQLBLANKLINES ON
SET DEFINE OFF
-- Jun 19, 2017 8:54:41 PM CEST
-- 2Pack Semaphore
INSERT INTO AD_SysConfig (AD_SysConfig_ID,EntityType,ConfigurationLevel,Updated,Value,AD_SysConfig_UU,IsActive,Name,Created,AD_Org_ID,CreatedBy,UpdatedBy,AD_Client_ID) VALUES (200098,'D','S',TO_DATE('2017-06-19 20:54:40','YYYY-MM-DD HH24:MI:SS'),'AUTOMATIC_PACKIN_PROCESSING','cba425fc-4e0e-4a45-bc14-281a98a52c81','Y','AUTOMATIC_PACKIN_PROCESSING',TO_DATE('2017-06-19 20:54:40','YYYY-MM-DD HH24:MI:SS'),0,100,100,0)
;
-- Jun 19, 2017 8:55:06 PM CEST
INSERT INTO AD_SysConfig (AD_SysConfig_ID,EntityType,ConfigurationLevel,Updated,Value,AD_SysConfig_UU,IsActive,Name,Created,AD_Org_ID,CreatedBy,UpdatedBy,AD_Client_ID) VALUES (200099,'D','S',TO_DATE('2017-06-19 20:55:05','YYYY-MM-DD HH24:MI:SS'),'120','2dd1b89a-936e-47aa-b7e6-3b66649ed224','Y','AUTOMATIC_PACKIN_TIMEOUT',TO_DATE('2017-06-19 20:55:05','YYYY-MM-DD HH24:MI:SS'),0,100,100,0)
;
INSERT INTO AD_SysConfig (AD_SysConfig_ID,EntityType,ConfigurationLevel,Updated,Value,AD_SysConfig_UU,IsActive,Name,Created,AD_Org_ID,CreatedBy,UpdatedBy,AD_Client_ID) VALUES (200100,'D','S',TO_DATE('2017-06-20 12:12:07','YYYY-MM-DD HH24:MI:SS'),'5','49244d58-3306-4a38-9458-7fa8616214c2','Y','AUTOMATIC_PACKIN_RETRIES',TO_DATE('2017-06-20 12:12:07','YYYY-MM-DD HH24:MI:SS'),0,100,100,0)
;
SELECT register_migration_script('201706192054_Ticket_1008086.sql') FROM dual
;

View File

@ -0,0 +1,14 @@
-- Jun 19, 2017 8:54:41 PM CEST
-- 2Pack Semaphore
INSERT INTO AD_SysConfig (AD_SysConfig_ID,EntityType,ConfigurationLevel,Updated,Value,AD_SysConfig_UU,IsActive,Name,Created,AD_Org_ID,CreatedBy,UpdatedBy,AD_Client_ID) VALUES (200098,'D','S',TO_TIMESTAMP('2017-06-19 20:54:40','YYYY-MM-DD HH24:MI:SS'),'AUTOMATIC_PACKIN_PROCESSING','cba425fc-4e0e-4a45-bc14-281a98a52c81','Y','AUTOMATIC_PACKIN_PROCESSING',TO_TIMESTAMP('2017-06-19 20:54:40','YYYY-MM-DD HH24:MI:SS'),0,100,100,0)
;
-- Jun 19, 2017 8:55:06 PM CEST
INSERT INTO AD_SysConfig (AD_SysConfig_ID,EntityType,ConfigurationLevel,Updated,Value,AD_SysConfig_UU,IsActive,Name,Created,AD_Org_ID,CreatedBy,UpdatedBy,AD_Client_ID) VALUES (200099,'D','S',TO_TIMESTAMP('2017-06-19 20:55:05','YYYY-MM-DD HH24:MI:SS'),'120','2dd1b89a-936e-47aa-b7e6-3b66649ed224','Y','AUTOMATIC_PACKIN_TIMEOUT',TO_TIMESTAMP('2017-06-19 20:55:05','YYYY-MM-DD HH24:MI:SS'),0,100,100,0)
;
INSERT INTO AD_SysConfig (AD_SysConfig_ID,EntityType,ConfigurationLevel,Updated,Value,AD_SysConfig_UU,IsActive,Name,Created,AD_Org_ID,CreatedBy,UpdatedBy,AD_Client_ID) VALUES (200100,'D','S',TO_TIMESTAMP('2017-06-20 12:12:07','YYYY-MM-DD HH24:MI:SS'),'5','49244d58-3306-4a38-9458-7fa8616214c2','Y','AUTOMATIC_PACKIN_RETRIES',TO_TIMESTAMP('2017-06-20 12:12:07','YYYY-MM-DD HH24:MI:SS'),0,100,100,0)
;
SELECT register_migration_script('201706192054_Ticket_1008086.sql') FROM dual
;

View File

@ -198,6 +198,10 @@ public class MSysConfig extends X_AD_SysConfig
/** Cache */ /** Cache */
private static CCache<String, String> s_cache = new CCache<String, String>(Table_Name, 40, 0, true); private static CCache<String, String> s_cache = new CCache<String, String>(Table_Name, 40, 0, true);
public static final String AUTOMATIC_PACKIN_PROCESSING = "AUTOMATIC_PACKIN_PROCESSING";
public static final String AUTOMATIC_PACKIN_TIMEOUT = "AUTOMATIC_PACKIN_TIMEOUT";
public static final String AUTOMATIC_PACKIN_RETRIES = "AUTOMATIC_PACKIN_RETRIES";
/** /**
* Get system configuration property of type string * Get system configuration property of type string
* @param Name * @param Name

View File

@ -13,8 +13,6 @@ package org.adempiere.pipo.srv;
import java.io.File; import java.io.File;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import org.adempiere.base.IDictionaryService; import org.adempiere.base.IDictionaryService;
@ -32,11 +30,8 @@ public class PipoDictionaryService implements IDictionaryService {
CLogger logger = CLogger.getCLogger(PipoDictionaryService.class.getName()); CLogger logger = CLogger.getCLogger(PipoDictionaryService.class.getName());
private final Semaphore semaphore;
public PipoDictionaryService() { public PipoDictionaryService() {
super(); super();
semaphore = new Semaphore(1, true);
} }
@Override @Override
@ -49,12 +44,6 @@ public class PipoDictionaryService implements IDictionaryService {
X_AD_Package_Imp_Proc adPackageImp = null; X_AD_Package_Imp_Proc adPackageImp = null;
PackIn packIn = null; PackIn packIn = null;
try { try {
try {
semaphore.tryAcquire(120, TimeUnit.SECONDS);
} catch (InterruptedException e) {
semaphore.release();
semaphore.acquire();
}
trxName = Trx.createTrxName("PipoDS"); trxName = Trx.createTrxName("PipoDS");
packIn = new PackIn(); packIn = new PackIn();
packIn.setPackageName(context.getBundle().getSymbolicName()); packIn.setPackageName(context.getBundle().getSymbolicName());
@ -116,7 +105,6 @@ public class PipoDictionaryService implements IDictionaryService {
try { try {
Trx.get(trxName, false).close(); Trx.get(trxName, false).close();
} catch (Exception e) {} } catch (Exception e) {}
semaphore.release();
adPackageImp.save(); // ignoring exceptions adPackageImp.save(); // ignoring exceptions
if (adPackageImp != null && packIn != null) { if (adPackageImp != null && packIn != null) {

View File

@ -41,6 +41,7 @@ Import-Package: org.adempiere.base,
org.apache.xerces.xs.datatypes;version="2.9.0", org.apache.xerces.xs.datatypes;version="2.9.0",
org.apache.xml.serialize;version="2.9.0", org.apache.xml.serialize;version="2.9.0",
org.compiere, org.compiere,
org.compiere.db,
org.compiere.model, org.compiere.model,
org.compiere.util, org.compiere.util,
org.osgi.framework;version="1.5.0", org.osgi.framework;version="1.5.0",

View File

@ -0,0 +1,102 @@
/******************************************************************************
* Copyright (C) 2017 Diego Ruiz *
* Copyright (C) 2017 Trek Global *
* This program is free software; you can redistribute it and/or modify it *
* under the terms version 2 of the GNU General Public License as published *
* by the Free Software Foundation. This program is distributed in the hope *
* that it will be useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
* See the GNU General Public License for more details. *
* You should have received a copy of the GNU General Public License along *
* with this program; if not, write to the Free Software Foundation, Inc., *
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *
*****************************************************************************/
package org.adempiere.plugin.utils;
import java.io.File;
import java.util.logging.Level;
import org.adempiere.base.IDictionaryService;
import org.compiere.model.MSysConfig;
import org.compiere.model.PO;
import org.compiere.model.Query;
import org.compiere.model.X_AD_Package_Imp;
import org.compiere.util.CLogger;
import org.compiere.util.DB;
import org.compiere.util.Env;
import org.compiere.util.Trx;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
public abstract class AbstractActivator implements BundleActivator, ServiceTrackerCustomizer<IDictionaryService, IDictionaryService> {
protected final static CLogger logger = CLogger.getCLogger(AbstractActivator.class.getName());
protected BundleContext context;
protected ServiceTracker<IDictionaryService, IDictionaryService> serviceTracker;
protected IDictionaryService service;
private String trxName = "";
protected boolean merge(File zipfile, String version) throws Exception {
boolean success = false;
if (!installedPackage(version)) {
service.merge(context, zipfile);
success = true;
} else {
logger.log(Level.SEVERE, "The file was already installed: " + zipfile.getName());
}
return success;
}
protected boolean installedPackage(String version) {
StringBuilder where = new StringBuilder("Name=? AND PK_Status = 'Completed successfully'");
Object[] params;
if (version != null) {
where.append(" AND PK_Version LIKE ?");
params = new Object[] { getName(), version + "%" };
} else {
params = new Object[] {getName()};
}
Query q = new Query(Env.getCtx(), X_AD_Package_Imp.Table_Name,
where.toString(), null);
q.setParameters(params);
return q.first() != null;
}
public abstract String getName();
public boolean getDBLock() {
int timeout = MSysConfig.getIntValue(MSysConfig.AUTOMATIC_PACKIN_TIMEOUT, 120);
int maxAttempts = MSysConfig.getIntValue(MSysConfig.AUTOMATIC_PACKIN_RETRIES, 5);
boolean lockAcquired = false;
while(maxAttempts > 0 && !lockAcquired) {
maxAttempts --;
if (getDBLock(timeout))
lockAcquired = true;
}
return lockAcquired;
}
public void releaseLock() {
Trx.get(trxName, false).close();
}
private boolean getDBLock(int timeout) {
return DB.getDatabase().forUpdate(getLockPO(), timeout);
}
private PO getLockPO() {
MSysConfig sysconfig = new Query(Env.getCtx(), MSysConfig.Table_Name,
"Name=? AND AD_Client_ID=0", null)
.setParameters(MSysConfig.AUTOMATIC_PACKIN_PROCESSING)
.first();
trxName = Trx.createTrxName("ActSysTrx");
sysconfig.set_TrxName(trxName);
return sysconfig;
}
}

View File

@ -16,18 +16,13 @@ import org.compiere.model.ServerStateChangeListener;
import org.compiere.model.X_AD_Package_Imp; import org.compiere.model.X_AD_Package_Imp;
import org.compiere.util.CLogger; import org.compiere.util.CLogger;
import org.compiere.util.Env; import org.compiere.util.Env;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext; import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference; import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
public class AdempiereActivator implements BundleActivator, ServiceTrackerCustomizer<IDictionaryService, IDictionaryService> { public class AdempiereActivator extends AbstractActivator {
protected final static CLogger logger = CLogger.getCLogger(AdempiereActivator.class.getName()); protected final static CLogger logger = CLogger.getCLogger(AdempiereActivator.class.getName());
private BundleContext context;
private ServiceTracker<IDictionaryService, IDictionaryService> serviceTracker;
private IDictionaryService service;
@Override @Override
public void start(BundleContext context) throws Exception { public void start(BundleContext context) throws Exception {
@ -53,27 +48,7 @@ public class AdempiereActivator implements BundleActivator, ServiceTrackerCustom
private void installPackage() { private void installPackage() {
// e.g. 1.0.0.qualifier, check only the "1.0.0" part // e.g. 1.0.0.qualifier, check only the "1.0.0" part
String version = getVersion(); String version = getPKVersion();
if (version != null)
{
int count = 0;
int index = -1;
for(int i = 0; i < version.length(); i++)
{
if(version.charAt(i) == '.')
count++;
if (count == 3)
{
index = i;
break;
}
}
if (index == -1)
index = version.length();
version = version.substring(0, index);
}
String where = "Name=? AND PK_Version LIKE ?"; String where = "Name=? AND PK_Version LIKE ?";
Query q = new Query(Env.getCtx(), X_AD_Package_Imp.Table_Name, Query q = new Query(Env.getCtx(), X_AD_Package_Imp.Table_Name,
@ -81,15 +56,45 @@ public class AdempiereActivator implements BundleActivator, ServiceTrackerCustom
q.setParameters(new Object[] { getName(), version + "%" }); q.setParameters(new Object[] { getName(), version + "%" });
X_AD_Package_Imp pkg = q.first(); X_AD_Package_Imp pkg = q.first();
if (pkg == null) { if (pkg == null) {
System.out.println("Installing " + getName() + " " + version + " ..."); if (getDBLock()) {
packIn(); System.out.println("Installing " + getName() + " " + version + " ...");
install(); packIn();
System.out.println(getName() + " " + version + " installed."); install();
releaseLock();
System.out.println(getName() + " " + version + " installed.");
} else {
logger.log(Level.SEVERE, "Could not acquire the DB lock to install:" + getName());
}
} else { } else {
if (logger.isLoggable(Level.INFO)) logger.info(getName() + " " + version + " was installed: " if (logger.isLoggable(Level.INFO)) logger.info(getName() + " " + version + " was installed: "
+ pkg.getCreated()); + pkg.getCreated());
} }
} }
private String getPKVersion() {
String version = getVersion();
if (version != null)
{
int count = 0;
int index = -1;
for(int i = 0; i < version.length(); i++)
{
if(version.charAt(i) == '.')
count++;
if (count == 3)
{
index = i;
break;
}
}
if (index == -1)
index = version.length();
version = version.substring(0, index);
}
return version;
}
protected void packIn() { protected void packIn() {
URL packout = context.getBundle().getEntry("/META-INF/2Pack.zip"); URL packout = context.getBundle().getEntry("/META-INF/2Pack.zip");
@ -106,7 +111,7 @@ public class AdempiereActivator implements BundleActivator, ServiceTrackerCustom
zipstream.write(buffer, 0, read); zipstream.write(buffer, 0, read);
} }
// call 2pack // call 2pack
service.merge(context, zipfile); merge(zipfile, getPKVersion());
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.SEVERE, "Pack in failed.", e); logger.log(Level.SEVERE, "Pack in failed.", e);
} }

View File

@ -35,23 +35,18 @@ import org.compiere.model.X_AD_Package_Imp;
import org.compiere.util.CLogger; import org.compiere.util.CLogger;
import org.compiere.util.Env; import org.compiere.util.Env;
import org.compiere.util.Trx; import org.compiere.util.Trx;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext; import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference; import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
/** /**
* *
* @author hengsin * @author hengsin
* *
*/ */
public class Incremental2PackActivator implements BundleActivator, ServiceTrackerCustomizer<IDictionaryService, IDictionaryService> { public class Incremental2PackActivator extends AbstractActivator {
protected final static CLogger logger = CLogger.getCLogger(Incremental2PackActivator.class.getName()); protected final static CLogger logger = CLogger.getCLogger(Incremental2PackActivator.class.getName());
private BundleContext context;
private ServiceTracker<IDictionaryService, IDictionaryService> serviceTracker;
private IDictionaryService service;
@Override @Override
public void start(BundleContext context) throws Exception { public void start(BundleContext context) throws Exception {
@ -189,13 +184,18 @@ public class Incremental2PackActivator implements BundleActivator, ServiceTracke
} }
}); });
for(TwoPackEntry entry : list) { if (getDBLock()) {
if (!installedVersions.contains(entry.version)) { for(TwoPackEntry entry : list) {
if (!packIn(entry.url)) { if (!installedVersions.contains(entry.version)) {
// stop processing further packages if one fail if (!packIn(entry.url)) {
break; // stop processing further packages if one fail
break;
}
} }
} }
releaseLock();
} else {
logger.log(Level.SEVERE, "Could not acquire the DB lock to install:" + getName());
} }
} }
@ -224,7 +224,8 @@ public class Incremental2PackActivator implements BundleActivator, ServiceTracke
zipstream.write(buffer, 0, read); zipstream.write(buffer, 0, read);
} }
// call 2pack // call 2pack
service.merge(context, zipfile); if (!merge(zipfile, extractVersionString(packout)))
return false;
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.SEVERE, "Pack in failed.", e); logger.log(Level.SEVERE, "Pack in failed.", e);
return false; return false;

View File

@ -35,23 +35,18 @@ import org.compiere.model.X_AD_Package_Imp;
import org.compiere.util.CLogger; import org.compiere.util.CLogger;
import org.compiere.util.Env; import org.compiere.util.Env;
import org.compiere.util.Util; import org.compiere.util.Util;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext; import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference; import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
/** /**
* *
* @author hengsin * @author hengsin
* *
*/ */
public class Version2PackActivator implements BundleActivator, ServiceTrackerCustomizer<IDictionaryService, IDictionaryService> { public class Version2PackActivator extends AbstractActivator {
protected final static CLogger logger = CLogger.getCLogger(Version2PackActivator.class.getName()); protected final static CLogger logger = CLogger.getCLogger(Version2PackActivator.class.getName());
private BundleContext context;
private ServiceTracker<IDictionaryService, IDictionaryService> serviceTracker;
private IDictionaryService service;
@Override @Override
public void start(BundleContext context) throws Exception { public void start(BundleContext context) throws Exception {
@ -162,11 +157,16 @@ public class Version2PackActivator implements BundleActivator, ServiceTrackerCus
} }
}); });
for(TwoPackEntry entry : list) { if (getDBLock()) {
if (!packIn(entry.url)) { for(TwoPackEntry entry : list) {
// stop processing further packages if one fail if (!packIn(entry.url)) {
break; // stop processing further packages if one fail
break;
}
} }
releaseLock();
} else {
logger.log(Level.SEVERE, "Could not acquire the DB lock to install:" + getName());
} }
} }
@ -195,7 +195,8 @@ public class Version2PackActivator implements BundleActivator, ServiceTrackerCus
zipstream.write(buffer, 0, read); zipstream.write(buffer, 0, read);
} }
// call 2pack // call 2pack
service.merge(context, zipfile); if (!merge(zipfile, extractVersionString(packout)))
return false;
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.SEVERE, "Pack in failed.", e); logger.log(Level.SEVERE, "Pack in failed.", e);
return false; return false;