Stabilization of replication

1.- fix the indentation
2.- fix the save exception
3.- fix close the JMS connection if an exception us catch.

kind regards
Victor Perez
www.e-evolution.com

Link to SF Tracker: http://sourceforge.net/support/tracker.php?aid=2936561
This commit is contained in:
vpj-cd 2010-02-08 01:43:46 +00:00
parent 4830d4c749
commit bec8791241
2 changed files with 78 additions and 57 deletions

View File

@ -279,7 +279,7 @@ public class TopicListener implements MessageListener {
TextMessage txtMessage = (TextMessage) message; TextMessage txtMessage = (TextMessage) message;
String text = txtMessage.getText(); String text = txtMessage.getText();
//log.finest("Received message: \n" + text ); log.finest("Received message: \n" + text );
Document documentToBeImported = XMLHelper.createDocumentFromString( text ); Document documentToBeImported = XMLHelper.createDocumentFromString( text );
StringBuffer result = new StringBuffer(); StringBuffer result = new StringBuffer();
@ -288,7 +288,10 @@ public class TopicListener implements MessageListener {
impHelper.importXMLDocument(result, documentToBeImported, trxName ); impHelper.importXMLDocument(result, documentToBeImported, trxName );
log.finest("Replicated ...");
if(replicationProcessor != null)
{
MIMPProcessorLog pLog = new MIMPProcessorLog(replicationProcessor.getMImportProcessor(), "Imported Document!"); MIMPProcessorLog pLog = new MIMPProcessorLog(replicationProcessor.getMImportProcessor(), "Imported Document!");
//pLog.setReference("topicName = " + topicName ); //pLog.setReference("topicName = " + topicName );
if (text.length() > 2000 ) { if (text.length() > 2000 ) {
@ -297,19 +300,33 @@ public class TopicListener implements MessageListener {
pLog.setTextMsg( text); pLog.setTextMsg( text);
} }
boolean resultSave = pLog.save(); pLog.saveEx();
log.finest("Result Save = " + resultSave); }
session.commit(); session.commit();
} catch (Exception e) { }
replicationProcessor.setProcessRunning(false); catch (Exception e)
try { {
log.finest("Rollback = " + e.toString());
try
{
session.rollback(); session.rollback();
} catch (JMSException e1) { stop();
//replicationProcessor.interrupt();
//replicationProcessor.join();
replicationProcessor.setProcessRunning(false);
}
/*catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace(); e1.printStackTrace();
}*/
catch (JMSException e2)
{
e2.printStackTrace();
} }
e.printStackTrace(); e.printStackTrace();
} }
} else { } else {

View File

@ -85,16 +85,21 @@ public class ReplicationProcessor extends AdempiereServer {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected void doWork() { protected void doWork()
if (isProcessRunning) { {
if (isProcessRunning)
{
// process is already started successfully! // process is already started successfully!
} else { }
else
{
// process is not started! // process is not started!
m_summary = new StringBuffer(); m_summary = new StringBuffer();
String trxName = mImportProcessor.get_TrxName(); String trxName = mImportProcessor.get_TrxName();
if ( trxName == null || "".equals(trxName) ) { if ( trxName == null || "".equals(trxName) )
{
// trxName = "ImportProcessor-" + System.currentTimeMillis(); // trxName = "ImportProcessor-" + System.currentTimeMillis();
} }
log.fine("trxName = " + trxName); log.fine("trxName = " + trxName);
@ -107,40 +112,39 @@ public class ReplicationProcessor extends AdempiereServer {
String javaClass = impProcessor_Type.getJavaClass(); String javaClass = impProcessor_Type.getJavaClass();
IImportProcessor importProcessor = null; IImportProcessor importProcessor = null;
try { try
{
Class clazz = Class.forName(javaClass); Class clazz = Class.forName(javaClass);
importProcessor = (IImportProcessor)clazz.newInstance(); importProcessor = (IImportProcessor)clazz.newInstance();
importProcessor.process(mImportProcessor.getCtx(), this, trxName ); importProcessor.process(mImportProcessor.getCtx(), this, trxName );
}
} catch (Exception e) { catch (Exception e)
{
isProcessRunning = false; isProcessRunning = false;
log.fine("ReplicationProcessor caught an exception !!!" ); log.fine("ReplicationProcessor caught an exception !!!" );
e.printStackTrace(); e.printStackTrace();
log.severe(e.getMessage());
log.severe( e.getMessage() );
MIMPProcessorLog pLog = new MIMPProcessorLog(mImportProcessor, e.getMessage() ); MIMPProcessorLog pLog = new MIMPProcessorLog(mImportProcessor, e.getMessage() );
pLog.setReference("#" + String.valueOf(p_runCount) + " - " + TimeUtil.formatElapsed(new Timestamp(p_startWork))); pLog.setReference("#" + String.valueOf(p_runCount) + " - " + TimeUtil.formatElapsed(new Timestamp(p_startWork)));
boolean resultSave = pLog.save(); pLog.saveEx();
try
try { {
importProcessor.stop(); importProcessor.stop();
} catch (Exception e1) { }
catch (Exception e1)
{
e1.printStackTrace(); e1.printStackTrace();
MIMPProcessorLog pLog2 = new MIMPProcessorLog(mImportProcessor, e1.getMessage() ); MIMPProcessorLog pLog2 = new MIMPProcessorLog(mImportProcessor, e1.getMessage() );
pLog2.saveEx();
boolean resultSave2 = pLog2.save();
} }
} }
// //
int no = mImportProcessor.deleteLog(); int no = mImportProcessor.deleteLog();
m_summary.append("Logs Records deleted=").append(no).append("; "); m_summary.append("Logs Records deleted=").append(no).append("; ");
// //
MIMPProcessorLog pLog = new MIMPProcessorLog(mImportProcessor, m_summary.toString()); MIMPProcessorLog pLog = new MIMPProcessorLog(mImportProcessor, m_summary.toString());
pLog.setReference("#" + String.valueOf(p_runCount) + " - " + TimeUtil.formatElapsed(new Timestamp(p_startWork))); pLog.setReference("#" + String.valueOf(p_runCount) + " - " + TimeUtil.formatElapsed(new Timestamp(p_startWork)));
boolean resultSave = pLog.save(); pLog.saveEx();
} }
} }