diff --git a/serverRoot/src/main/server/org/adempiere/server/rpl/imp/TopicListener.java b/serverRoot/src/main/server/org/adempiere/server/rpl/imp/TopicListener.java index 39bf952c95..8a36c9f545 100644 --- a/serverRoot/src/main/server/org/adempiere/server/rpl/imp/TopicListener.java +++ b/serverRoot/src/main/server/org/adempiere/server/rpl/imp/TopicListener.java @@ -279,7 +279,7 @@ public class TopicListener implements MessageListener { TextMessage txtMessage = (TextMessage) message; String text = txtMessage.getText(); - //log.finest("Received message: \n" + text ); + log.finest("Received message: \n" + text ); Document documentToBeImported = XMLHelper.createDocumentFromString( text ); StringBuffer result = new StringBuffer(); @@ -288,28 +288,45 @@ public class TopicListener implements MessageListener { impHelper.importXMLDocument(result, documentToBeImported, trxName ); - - MIMPProcessorLog pLog = new MIMPProcessorLog(replicationProcessor.getMImportProcessor(), "Imported Document!"); - //pLog.setReference("topicName = " + topicName ); - if (text.length() > 2000 ) { - pLog.setTextMsg( text.substring(0, 1999) ); - } else { - pLog.setTextMsg( text); + log.finest("Replicated ..."); + + if(replicationProcessor != null) + { + MIMPProcessorLog pLog = new MIMPProcessorLog(replicationProcessor.getMImportProcessor(), "Imported Document!"); + //pLog.setReference("topicName = " + topicName ); + if (text.length() > 2000 ) { + pLog.setTextMsg( text.substring(0, 1999) ); + } else { + pLog.setTextMsg( text); + } + + pLog.saveEx(); } - boolean resultSave = pLog.save(); - log.finest("Result Save = " + resultSave); - session.commit(); - } catch (Exception e) { - replicationProcessor.setProcessRunning(false); - try { + } + catch (Exception e) + { + log.finest("Rollback = " + e.toString()); + try + { session.rollback(); - } catch (JMSException e1) { - e1.printStackTrace(); + stop(); + //replicationProcessor.interrupt(); + //replicationProcessor.join(); + replicationProcessor.setProcessRunning(false); + } + /*catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + }*/ + catch (JMSException e2) + { + e2.printStackTrace(); } e.printStackTrace(); + } } else { diff --git a/serverRoot/src/main/server/org/compiere/server/ReplicationProcessor.java b/serverRoot/src/main/server/org/compiere/server/ReplicationProcessor.java index cc691d5cc2..fed57b86c7 100644 --- a/serverRoot/src/main/server/org/compiere/server/ReplicationProcessor.java +++ b/serverRoot/src/main/server/org/compiere/server/ReplicationProcessor.java @@ -85,62 +85,66 @@ public class ReplicationProcessor extends AdempiereServer { @SuppressWarnings("unchecked") @Override - protected void doWork() { - if (isProcessRunning) { + protected void doWork() + { + if (isProcessRunning) + { // process is already started successfully! - } else { - // process is not started! - - m_summary = new StringBuffer(); - String trxName = mImportProcessor.get_TrxName(); - if ( trxName == null || "".equals(trxName) ) { + } + else + { + // process is not started! + + m_summary = new StringBuffer(); + String trxName = mImportProcessor.get_TrxName(); + if ( trxName == null || "".equals(trxName) ) + { // trxName = "ImportProcessor-" + System.currentTimeMillis(); - } - log.fine("trxName = " + trxName); - log.fine("ImportProcessor = " + mImportProcessor); - - int IMP_ProcessorType_ID = 0; - IMP_ProcessorType_ID = mImportProcessor.getIMP_Processor_Type_ID(); - X_IMP_Processor_Type impProcessor_Type = new X_IMP_Processor_Type(mImportProcessor.getCtx(), IMP_ProcessorType_ID, trxName ); - log.fine("impProcessor_Type = " + impProcessor_Type); // TODO --- REMOVE - - String javaClass = impProcessor_Type.getJavaClass(); - IImportProcessor importProcessor = null; - try { - Class clazz = Class.forName(javaClass); - importProcessor = (IImportProcessor)clazz.newInstance(); - - importProcessor.process(mImportProcessor.getCtx(), this, trxName ); - - } catch (Exception e) { + } + log.fine("trxName = " + trxName); + log.fine("ImportProcessor = " + mImportProcessor); + + int IMP_ProcessorType_ID = 0; + IMP_ProcessorType_ID = mImportProcessor.getIMP_Processor_Type_ID(); + X_IMP_Processor_Type impProcessor_Type = new X_IMP_Processor_Type(mImportProcessor.getCtx(), IMP_ProcessorType_ID, trxName ); + log.fine("impProcessor_Type = " + impProcessor_Type); // TODO --- REMOVE + + String javaClass = impProcessor_Type.getJavaClass(); + IImportProcessor importProcessor = null; + try + { + Class clazz = Class.forName(javaClass); + importProcessor = (IImportProcessor)clazz.newInstance(); + importProcessor.process(mImportProcessor.getCtx(), this, trxName ); + } + catch (Exception e) + { isProcessRunning = false; log.fine("ReplicationProcessor caught an exception !!!" ); e.printStackTrace(); - - log.severe( e.getMessage() ); - + log.severe(e.getMessage()); MIMPProcessorLog pLog = new MIMPProcessorLog(mImportProcessor, e.getMessage() ); pLog.setReference("#" + String.valueOf(p_runCount) + " - " + TimeUtil.formatElapsed(new Timestamp(p_startWork))); - boolean resultSave = pLog.save(); - - try { - importProcessor.stop(); - } catch (Exception e1) { - e1.printStackTrace(); - MIMPProcessorLog pLog2 = new MIMPProcessorLog(mImportProcessor, e1.getMessage() ); - - boolean resultSave2 = pLog2.save(); + pLog.saveEx(); + try + { + importProcessor.stop(); } - } - + catch (Exception e1) + { + e1.printStackTrace(); + MIMPProcessorLog pLog2 = new MIMPProcessorLog(mImportProcessor, e1.getMessage() ); + pLog2.saveEx(); + } + } // int no = mImportProcessor.deleteLog(); m_summary.append("Logs Records deleted=").append(no).append("; "); // MIMPProcessorLog pLog = new MIMPProcessorLog(mImportProcessor, m_summary.toString()); pLog.setReference("#" + String.valueOf(p_runCount) + " - " + TimeUtil.formatElapsed(new Timestamp(p_startWork))); - boolean resultSave = pLog.save(); + pLog.saveEx(); } }