Stabilization of replication

kind regards
Victor Perez
www.e-evolution.com
Link to SF Tracker: http://sourceforge.net/support/tracker.php?aid=2936561
This commit is contained in:
vpj-cd 2010-01-21 20:07:30 +00:00
parent 09e78c8486
commit b610097f06
6 changed files with 110 additions and 64 deletions

View File

@ -48,6 +48,9 @@ import org.compiere.util.CLogger;
* @author victor.perez@e-evolution.com, www.e-evolution.com
* <li> BF2875989 Deactivate replication records are include to replication
* <li> https://sourceforge.net/tracker/?func=detail&aid=2875989&group_id=176962&atid=879332
* <li>[ 2195090 ] Stabilization of replication
* <li>https://sourceforge.net/tracker/?func=detail&atid=879332&aid=2936561&group_id=176962
*
* @version $Id$
*/
public class ExportModelValidator implements ModelValidator
@ -193,7 +196,6 @@ public class ExportModelValidator implements ModelValidator
|| type == TIMING_AFTER_CLOSE
|| type == TIMING_AFTER_REVERSECORRECT
|| type == TIMING_AFTER_VOID
|| type == TIMING_AFTER_VOID
|| type == TIMING_AFTER_PREPARE
)
{

View File

@ -48,6 +48,7 @@ import org.compiere.model.MColumn;
import org.compiere.model.MReplicationStrategy;
import org.compiere.model.MTable;
import org.compiere.model.PO;
import org.compiere.model.Query;
import org.compiere.model.X_EXP_FormatLine;
import org.compiere.util.CLogger;
import org.compiere.util.DB;
@ -70,6 +71,10 @@ import org.w3c.dom.Text;
* @author Antonio Cañaveral, e-Evolution
* <li>[ 2195016 ] Implementation delete records messages
* <li>http://sourceforge.net/tracker/index.php?func=detail&aid=2195016&group_id=176962&atid=879332
* @author victor.perez@e-evolution.com, e-Evolution
* <li>[ 2195090 ] Stabilization of replication
* <li>https://sourceforge.net/tracker/?func=detail&atid=879332&aid=2936561&group_id=176962
*
*/
public class ExportHelper {
@ -230,11 +235,13 @@ public class ExportHelper {
MClient client = MClient.get (exportFormat.getCtx(), m_AD_Client_ID);
MTable table = MTable.get(exportFormat.getCtx(), exportFormat.getAD_Table_ID());
log.info("Table = " + table);
int[] ids = MTable.getAllIDs(table.getTableName(), where, null);
for (int id : ids)
Collection<PO> datas = new Query(exportFormat.getCtx(),table.getTableName(), exportFormat.getWhereClause(), exportFormat.get_TrxName())
.setOnlyActiveRecords(true)
.list();
for (PO po : datas)
{
PO po = table.getPO(id, exportFormat.get_TrxName());
log.info("Client = " + client.toString());
log.finest("po.getAD_Org_ID() = " + po.getAD_Org_ID());
log.finest("po.get_TrxName() = " + po.get_TrxName());
@ -242,9 +249,10 @@ public class ExportHelper {
po.set_TrxName("exportRecord");
}
if (po.get_KeyColumns().length > 1 || po.get_KeyColumns().length < 1) {
throw new Exception(Msg.getMsg (po.getCtx(), "ExportMultiColumnNotSupported"));
if (po.get_KeyColumns().length < 1) {
throw new Exception(Msg.getMsg (po.getCtx(), "ExportNoneColumnKeyNotSupported"));//TODO: Create Mesagge.
}
// TODO - get proper Export Format!
String version = "3.2.0";
outDocument = createNewDocument();
@ -445,7 +453,8 @@ public class ExportHelper {
// process Embedded Export Format
int embeddedFormat_ID = formatLine.getEXP_EmbeddedFormat_ID();
MEXPFormat embeddedFormat = new MEXPFormat(masterPO.getCtx(), embeddedFormat_ID, masterPO.get_TrxName());
//get from cache
MEXPFormat embeddedFormat = MEXPFormat.get(masterPO.getCtx(), embeddedFormat_ID, masterPO.get_TrxName());
MTable tableEmbedded = MTable.get(masterPO.getCtx(), embeddedFormat.getAD_Table_ID());
log.info("Table Embedded = " + tableEmbedded);
@ -494,7 +503,8 @@ public class ExportHelper {
// process Referenced Export Format
int embeddedFormat_ID = formatLine.getEXP_EmbeddedFormat_ID();
MEXPFormat embeddedFormat = new MEXPFormat(masterPO.getCtx(), embeddedFormat_ID, masterPO.get_TrxName());
//get from cache
MEXPFormat embeddedFormat = MEXPFormat.get(masterPO.getCtx(), embeddedFormat_ID, masterPO.get_TrxName());
MTable tableEmbedded = MTable.get(masterPO.getCtx(), embeddedFormat.getAD_Table_ID());
log.info("Table Embedded = " + tableEmbedded);

View File

@ -66,6 +66,10 @@ import org.w3c.dom.NodeList;
* @author Antonio Cañaveral, e-Evolution
* <li>[ 2195016 ] Implementation delete records messages
* <li>http://sourceforge.net/tracker/index.php?func=detail&aid=2195016&group_id=176962&atid=879332
* @author victor.perez@e-evolution.com, e-Evolution
* <li>[ 2195090 ] Stabilization of replication
* <li>https://sourceforge.net/tracker/?func=detail&atid=879332&aid=2936561&group_id=176962
*
*/
public class ImportHelper {
@ -165,8 +169,6 @@ public class ImportHelper {
PO po = importElement(ctx, result, rootElement, expFormat, ReplicationType, trxName);
boolean resultSave=false;
if(po != null)
{
// Here must invoke other method else we get cycle...
@ -174,13 +176,13 @@ public class ImportHelper {
|| ModelValidator.TYPE_BEFORE_DELETE_REPLICATION == ReplicationEvent
|| ModelValidator.TYPE_DELETE == ReplicationEvent)
{
resultSave=po.delete(true);
po.deleteEx(true);
}
else
{
if(X_AD_ReplicationTable.REPLICATIONTYPE_Broadcast.equals(ReplicationType))
{
resultSave = po.saveReplica(true);
po.saveReplica(true);
MReplicationStrategy rplStrategy = new MReplicationStrategy(client.getCtx(), client.getAD_ReplicationStrategy_ID(), null);
ExportHelper expHelper = new ExportHelper(client, rplStrategy);
expHelper.exportRecord( po,
@ -191,7 +193,7 @@ public class ImportHelper {
else if(X_AD_ReplicationTable.REPLICATIONTYPE_Merge.equals(ReplicationType)
|| X_AD_ReplicationTable.REPLICATIONTYPE_Reference.equals(ReplicationType))
{
resultSave = po.saveReplica(true);
po.saveReplica(true);
}
/*else if (X_AD_ReplicationTable.REPLICATIONTYPE_Reference.equals(ReplicationType))
{
@ -210,8 +212,7 @@ public class ImportHelper {
}
}
result.append("ResultSave=").append(resultSave).append("; ");
result.append("Save Successful ;");
/*if (resultSave)
{
if(ReplicationMode == MReplicationStrategy.REPLICATION_DOCUMENT &&
@ -265,10 +266,8 @@ public class ImportHelper {
throw new Exception(Msg.getMsg(ctx, "EDIMultiColumnNotSupported"));
}
StringBuffer orderBy = new StringBuffer(MEXPFormatLine.COLUMNNAME_IsMandatory).append(" DESC ")
.append(", ").append(MEXPFormatLine.COLUMNNAME_Position);
Collection<MEXPFormatLine> formatLines = expFormat.getFormatLinesOrderedBy(orderBy.toString());
Collection<MEXPFormatLine> formatLines = expFormat.getFormatLinesOrderedBy(MEXPFormatLine.COLUMNNAME_IsMandatory
+ " , " + MEXPFormatLine.COLUMNNAME_Position);
if (formatLines == null || formatLines.size() < 1)
{
throw new Exception(Msg.getMsg(ctx, "EXPFormatNoLines"));
@ -281,6 +280,8 @@ public class ImportHelper {
log.info("formatLine: [" + formatLine.toString() + "]");
//Get the value
Object value = getValueFromFormat(formatLine,po,rootElement,result,ReplicationType,trxName);
if (value == null || value.toString().equals(""))
continue;
//Set the value
setReplicaValues(value, formatLine, po, result);
}
@ -313,7 +314,8 @@ public class ImportHelper {
else if (MEXPFormatLine.TYPE_ReferencedEXPFormat.equals(line.getType()))
{
// Referenced Export Format
MEXPFormat referencedExpFormat = new MEXPFormat(ctx, line.getEXP_EmbeddedFormat_ID(), trxName);
//get from cache
MEXPFormat referencedExpFormat = MEXPFormat.get(ctx, line.getEXP_EmbeddedFormat_ID(), trxName);
log.info("referencedExpFormat = " + referencedExpFormat);
int refRecord_ID = 0;
@ -340,25 +342,14 @@ public class ImportHelper {
}
else if (MEXPFormatLine.TYPE_EmbeddedEXPFormat.equals(line.getType()))
{
boolean resSave = false;
if (po.get_ID() == 0)
if(po.is_Changed())
{
resSave = po.saveReplica(true);
result.append("ResultSave-MasterPO=").append(resSave).append("; ");
log.info("ResultSave-MasterPO = " + resSave);
}
else
{
resSave = true;
}
if (!resSave)
{
throw new Exception("Failed to save Master PO");
po.saveReplica(true);
}
// Embedded Export Format It is used for Parent-Son records like Order&OrderLine
MEXPFormat referencedExpFormat = new MEXPFormat(ctx, line.getEXP_EmbeddedFormat_ID(), trxName);
//get from cache
MEXPFormat referencedExpFormat = MEXPFormat.get(ctx, line.getEXP_EmbeddedFormat_ID(), trxName);
log.info("embeddedExpFormat = " + referencedExpFormat);
NodeList nodeList = XMLHelper.getNodeList("/"+rootElement.getNodeName() + "/" + line.getValue(), rootElement);
@ -372,9 +363,9 @@ public class ImportHelper {
log.info("=== BEGIN RECURSION CALL ===");
embeddedPo = importElement(ctx, result, referencedElement, referencedExpFormat,ReplicationType, trxName);
log.info("embeddedPo = " + embeddedPo);
embeddedPo.saveReplica(true);
result.append(" Embedded Save Successful ; ");
boolean rSave = embeddedPo.saveReplica(true);
result.append("ResultSave-EmbeddedPO=").append(rSave).append("; ");
}
}
@ -474,7 +465,7 @@ public class ImportHelper {
//
if (!Util.isEmpty(value.toString()))
{
value = new Integer(value.toString());
value = new BigDecimal(value.toString());
}
else
{
@ -600,15 +591,20 @@ public class ImportHelper {
{
// Referenced Export Format
log.info("referencedExpFormat.EXP_EmbeddedFormat_ID = " + uniqueFormatLine.getEXP_EmbeddedFormat_ID());
MEXPFormat referencedExpFormat = new MEXPFormat(ctx, uniqueFormatLine.getEXP_EmbeddedFormat_ID(), trxName);
//get from cache
MEXPFormat referencedExpFormat = MEXPFormat.get(ctx, uniqueFormatLine.getEXP_EmbeddedFormat_ID(), trxName);
log.info("referencedExpFormat = " + referencedExpFormat);
int record_ID = 0;
// Find Record_ID by ???Value??? In fact by Columns set as Part Of Unique Index in Export Format!
Element referencedNode = ((Element) rootElement.getElementsByTagName(uniqueFormatLine.getValue()).item(0));
log.info("referencedNode = " + referencedNode);
if (referencedNode == null)
{
throw new IllegalArgumentException("referencedNode can't be null!");
}
record_ID = getID(ctx, referencedExpFormat, referencedNode, uniqueFormatLine.getValue(), trxName);
log.info("record_ID = " + record_ID);
cols[col] = new Integer(record_ID);

View File

@ -38,11 +38,16 @@ import java.util.Properties;
import org.compiere.util.CCache;
import org.compiere.util.CLogger;
/**
* @author Trifon N. Trifonov
* @author Antonio Cañaveral, e-Evolution
* <li>[ 2195090 ] Implementing ExportFormat cache
* <li>http://sourceforge.net/tracker/index.php?func=detail&aid=2195090&group_id=176962&atid=879335
* @author victor.perez@e-evolution.com, e-Evolution
* <li>[ 2195090 ] Stabilization of replication
* <li>https://sourceforge.net/tracker/?func=detail&atid=879332&aid=2936561&group_id=176962
*
*/
public class MEXPFormat extends X_EXP_Format {
@ -54,9 +59,14 @@ public class MEXPFormat extends X_EXP_Format {
/** Static Logger */
private static CLogger s_log = CLogger.getCLogger (MEXPFormat.class);
private static CCache<String,MEXPFormat> s_cache = new CCache<String,MEXPFormat>("MEXPFormat", 50 );
private static CCache<String,MEXPFormat> s_cache = new CCache<String,MEXPFormat>(MEXPFormat.Table_Name, 50 );
private static CCache<Integer,MEXPFormat> exp_format_by_id_cache = new CCache<Integer,MEXPFormat>(MEXPFormat.Table_Name, 50);
public MEXPFormat(Properties ctx, int EXP_Format_ID, String trxName) {
private Collection<MEXPFormatLine> m_lines = null;
private Collection<MEXPFormatLine> m_lines_unique = null;
public MEXPFormat(Properties ctx, int EXP_Format_ID, String trxName)
{
super(ctx, EXP_Format_ID, trxName);
}
@ -70,22 +80,44 @@ public class MEXPFormat extends X_EXP_Format {
public Collection<MEXPFormatLine> getFormatLinesOrderedBy(String orderBy)
{
if(m_lines != null)
{
return m_lines;
}
final String clauseWhere = X_EXP_FormatLine.COLUMNNAME_EXP_Format_ID + "=?";
return new Query(getCtx() , I_EXP_FormatLine.Table_Name, clauseWhere , get_TrxName())
m_lines = new Query(getCtx() , I_EXP_FormatLine.Table_Name, clauseWhere , get_TrxName())
.setOnlyActiveRecords(true)
.setParameters(new Object[]{getEXP_Format_ID()})
.setOrderBy(orderBy)
.list();
return m_lines;
}
public Collection<MEXPFormatLine> getUniqueColumns() throws SQLException {
if (m_lines_unique != null)
return m_lines_unique;
final String clauseWhere = X_EXP_FormatLine.COLUMNNAME_EXP_Format_ID+"= ?"
+ " AND " + X_EXP_FormatLine.COLUMNNAME_IsPartUniqueIndex +"= ?";
return new Query(getCtx(), I_EXP_FormatLine.Table_Name, clauseWhere, get_TrxName())
m_lines_unique = new Query(getCtx(), I_EXP_FormatLine.Table_Name, clauseWhere, get_TrxName())
.setOnlyActiveRecords(true)
.setParameters(new Object[]{getEXP_Format_ID(), "Y"})
.setOrderBy(X_EXP_FormatLine.COLUMNNAME_Position)
.list();
return m_lines_unique;
}
public static MEXPFormat get(Properties ctx, int EXP_Format_ID, String trxName)
{
MEXPFormat exp_format = exp_format_by_id_cache.get(EXP_Format_ID);
if(exp_format != null)
return exp_format;
exp_format = new MEXPFormat(ctx, EXP_Format_ID , trxName);
exp_format.getFormatLines();
exp_format_by_id_cache.put(EXP_Format_ID, exp_format);
return exp_format;
}
public static MEXPFormat getFormatByValueAD_Client_IDAndVersion(Properties ctx, String value, int AD_Client_ID, String version, String trxName)
@ -102,7 +134,9 @@ public class MEXPFormat extends X_EXP_Format {
retValue = (MEXPFormat) new Query(ctx,X_EXP_Format.Table_Name,whereCluse.toString(),trxName)
.setParameters(new Object[] {value,AD_Client_ID,version}).first();
retValue.getFormatLines();
s_cache.put (key, retValue);
exp_format_by_id_cache.put(retValue.getEXP_Format_ID(), retValue);
return retValue;
}
@ -116,15 +150,16 @@ public class MEXPFormat extends X_EXP_Format {
if(retValue!=null)
return retValue;
StringBuffer whereCluse = new StringBuffer(" AD_Client_ID = ? ")
StringBuffer whereClause = new StringBuffer(" AD_Client_ID = ? ")
.append(" AND ").append(X_EXP_Format.COLUMNNAME_AD_Table_ID).append(" = ? ")
.append(" AND ").append(X_EXP_Format.COLUMNNAME_Version).append(" = ?");
retValue = (MEXPFormat) new Query(ctx,X_EXP_Format.Table_Name,whereCluse.toString(),trxName)
.setParameters(new Object[] {AD_Client_ID,AD_Table_ID,version}).first();
retValue = (MEXPFormat) new Query(ctx,X_EXP_Format.Table_Name,whereClause.toString(),trxName)
.setParameters(new Object[] {AD_Client_ID,AD_Table_ID,version})
.first();
retValue.getFormatLines();
s_cache.put (key, retValue);
exp_format_by_id_cache.put(retValue.getEXP_Format_ID(), retValue);
return retValue;
}

View File

@ -2229,10 +2229,10 @@ public abstract class PO
return save();
} // save
public boolean saveReplica (boolean isFromReplication)
public void saveReplica (boolean isFromReplication) throws AdempiereException
{
setReplication(isFromReplication);
return save();
saveEx();
}
/**

View File

@ -207,7 +207,7 @@ public class TopicListener implements MessageListener {
}
catch (Exception e)
{
log.info("Connection with clientID '" + clientID +"' already exists");
log.info("Connection with clientID '" + clientID +"' already exists" + e.toString());
conn.close();
return;
}
@ -256,7 +256,8 @@ public class TopicListener implements MessageListener {
conn.start();
log.finest("Waiting for JMS messages...");
if(replicationProcessor !=null)
{
MIMPProcessorLog pLog = new MIMPProcessorLog(replicationProcessor.getMImportProcessor(), "Connected to JMS Server. Waiting for messages!");
StringBuffer logReference = new StringBuffer("topicName = ").append(topicName)
.append(", subscriptionName = ").append( subscriptionName )
@ -266,6 +267,8 @@ public class TopicListener implements MessageListener {
log.finest("Result Save = " + resultSave);
}
}
/**
*
*/