IDEMPIERE-4056 Implement cluster support for idempiere Monitor and Server Manager. Fix deadlock, add scheduler api.

This commit is contained in:
Heng Sin Low 2019-10-03 20:09:43 +08:00
parent c1d0f137cd
commit 8e8430b35f
11 changed files with 565 additions and 49 deletions

View File

@ -96,7 +96,14 @@ public class MScheduler extends X_AD_Scheduler
*/
public String getServerID ()
{
return "Scheduler" + get_ID();
if (get_ID() == 0 && get_IDOld() > 0)
{
return "Scheduler" + get_IDOld();
}
else
{
return "Scheduler" + get_ID();
}
} // getServerID
/**

View File

@ -3,6 +3,7 @@ package org.idempiere.distributed;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public interface ICacheService {
@ -11,4 +12,28 @@ public interface ICacheService {
public <K>List<K> getList(String name);
public <K>Set<K> getSet(String name);
/**
* Tries to acquire the lock for the specified key.
* If the lock is not available, then the current thread becomes disabled for thread scheduling purposes and lies dormant
* until one of two things happens - the lock is acquired by the current thread, or the specified waiting time elapses.
*
* @param map
* @param key
* @param timeout
* @param timeunit
* @return true if lock is acquired, false otherwise
* @throws InterruptedException
*/
public <K, V>boolean tryLock(Map<K, V> map, K key, long timeout, TimeUnit timeunit) throws InterruptedException;
/**
* Releases the lock for the specified key. It never blocks and returns immediately. If the current thread is the holder
* of this lock, then the hold count is decremented. If the hold count is zero, then the lock is released.
* If the current thread is not the holder of this lock, then IllegalMonitorStateException is thrown.
*
* @param map
* @param key
*/
public <K, V>void unLock(Map<K, V> map, K key);
}

View File

@ -23,6 +23,8 @@ Import-Package: javax.jms;version="1.1.0",
org.apache.ecs.xhtml,
org.apache.poi.hssf.usermodel,
org.osgi.framework;version="1.6.0",
org.osgi.service.component.annotations;version="1.3.0",
org.osgi.service.event;version="1.4.0",
org.osgi.util.tracker;version="1.5.0",
org.restlet,
org.restlet.data,

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="org.adempiere.server.scheduler.model.event.SchedulerModelEventHandler">
<reference bind="bindEventManager" cardinality="1..1" interface="org.adempiere.base.event.IEventManager" name="eventManager" policy="static" unbind="unbindEventManager"/>
<implementation class="org.adempiere.server.scheduler.model.event.SchedulerModelEventHandler"/>
</scr:component>

View File

@ -0,0 +1,133 @@
/**********************************************************************
* This file is part of iDempiere ERP Open Source *
* http://www.idempiere.org *
* *
* Copyright (C) Contributors *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software *
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, *
* MA 02110-1301, USA. *
* *
* Contributors: *
* - Trek Global Corporation *
* - Heng Sin Low *
**********************************************************************/
package org.adempiere.server.scheduler.model.event;
import java.util.logging.Level;
import org.adempiere.base.event.AbstractEventHandler;
import org.adempiere.base.event.IEventManager;
import org.adempiere.base.event.IEventTopics;
import org.compiere.model.I_AD_Scheduler;
import org.compiere.model.MScheduler;
import org.compiere.server.AdempiereServerMgr;
import org.compiere.server.IServerManager;
import org.compiere.util.CLogger;
import org.compiere.util.Trx;
import org.compiere.util.TrxEventListener;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.event.Event;
/**
* @author hengsin
*
*/
@Component(name = "org.adempiere.server.scheduler.model.event.SchedulerModelEventHandler",
reference = {@Reference(name = "eventManager", service = IEventManager.class, bind = "bindEventManager",
unbind = "unbindEventManager", cardinality = ReferenceCardinality.MANDATORY,
policy = ReferencePolicy.STATIC)})
public class SchedulerModelEventHandler extends AbstractEventHandler {
/**
* default constructor
*/
public SchedulerModelEventHandler() {
}
/* (non-Javadoc)
* @see org.adempiere.base.event.AbstractEventHandler#doHandleEvent(org.osgi.service.event.Event)
*/
@Override
protected void doHandleEvent(Event event) {
if(event.getTopic().equals(IEventTopics.PO_BEFORE_CHANGE)) {
MScheduler scheduler = (MScheduler) getPO(event);
if (scheduler.isActive() && !scheduler.is_new() && scheduler.is_ValueChanged(I_AD_Scheduler.COLUMNNAME_AD_Schedule_ID)) {
Trx trx = Trx.get(scheduler.get_TrxName(), false);
trx.addTrxEventListener(new TrxEventListener() {
@Override
public void afterRollback(Trx trx, boolean success) {
}
@Override
public void afterCommit(Trx trx, boolean success) {
if (success) {
//restart if server instance is local
IServerManager serverMgr = AdempiereServerMgr.get(false);
if (serverMgr != null) {
try {
int state = serverMgr.getServerStatus(scheduler.getServerID());
if (state == IServerManager.SERVER_STATE_STARTED) {
String error = serverMgr.stop(scheduler.getServerID());
if (error == null) {
serverMgr.start(scheduler.getServerID());
}
}
} catch (Exception e) {
CLogger.getCLogger(getClass()).log(Level.SEVERE, e.getMessage(), e);
}
}
}
}
@Override
public void afterClose(Trx trx) {
}
});
}
} else if(event.getTopic().equals(IEventTopics.PO_AFTER_CHANGE)) {
MScheduler scheduler = (MScheduler) getPO(event);
if (!scheduler.isActive()) {
//remove from local servermgr
IServerManager serverMgr = AdempiereServerMgr.get(false);
if (serverMgr != null) {
if (serverMgr.getServerInstance(scheduler.getServerID()) != null)
serverMgr.removeScheduler(scheduler);
}
}
} else if(event.getTopic().equals(IEventTopics.PO_AFTER_DELETE)) {
MScheduler scheduler = (MScheduler) getPO(event);
//remove from local servermgr
IServerManager serverMgr = AdempiereServerMgr.get(false);
if (serverMgr != null) {
if (serverMgr.getServerInstance(scheduler.getServerID()) != null)
serverMgr.removeScheduler(scheduler);
}
}
}
/* (non-Javadoc)
* @see org.adempiere.base.event.AbstractEventHandler#initialize()
*/
@Override
protected void initialize() {
registerTableEvent(IEventTopics.PO_BEFORE_CHANGE, I_AD_Scheduler.Table_Name);
registerTableEvent(IEventTopics.PO_AFTER_CHANGE, I_AD_Scheduler.Table_Name);
registerTableEvent(IEventTopics.PO_AFTER_DELETE, I_AD_Scheduler.Table_Name);
}
}

View File

@ -20,12 +20,14 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.adempiere.base.IServiceHolder;
import org.adempiere.base.Service;
import org.adempiere.server.AdempiereServerActivator;
import org.adempiere.server.IServerFactory;
@ -35,7 +37,9 @@ import org.compiere.model.MScheduler;
import org.compiere.model.MSession;
import org.compiere.util.CLogger;
import org.compiere.util.Env;
import org.idempiere.server.cluster.ClusterServerMgr;
import org.idempiere.distributed.ICacheService;
import org.idempiere.distributed.IClusterMember;
import org.idempiere.distributed.IClusterService;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.ServiceReference;
@ -133,6 +137,34 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
if (stopAll() != null)
return "Failed to stop all servers";
String clusterId = getClusterMemberId();
if (clusterId != null) {
Map<String, String> map = getServerOwnerMap();
if (map != null) {
ICacheService cacheService = getCacheService();
try {
String reloadLockKey = "cluster.server.owner.map.reload";
if (cacheService.tryLock(map, reloadLockKey, 30, TimeUnit.SECONDS)) {
try {
List<String> toRemove = new ArrayList<>();
for(Map.Entry<String, String> entry : map.entrySet()) {
if (entry.getValue().equals(clusterId)) {
toRemove.add(entry.getKey());
}
}
for(String key : toRemove) {
map.remove(key);
}
} finally {
cacheService.unLock(map, reloadLockKey);
}
}
} catch (Exception e) {
return "Failed to lock cluster server owner map";
}
}
}
int noServers = 0;
m_servers=new ArrayList<LocalServerController>();
processorClass = new HashSet<String>();
@ -163,6 +195,31 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
{
AdempiereProcessor model = server.getModel();
if (canRunHere(server, model)) {
String clusterId = getClusterMemberId();
if (clusterId != null) {
Map<String, String> map = getServerOwnerMap();
if (map != null) {
ICacheService cacheService = getCacheService();
try {
if (cacheService.tryLock(map, server.getServerID(), 30, TimeUnit.SECONDS)) {
try {
String memberId = map.get(server.getServerID());
if (memberId != null && !memberId.equals(clusterId)) {
continue;
} else if (memberId == null) {
map.put(server.getServerID(), clusterId);
}
} finally {
cacheService.unLock(map, server.getServerID());
}
} else {
continue;
}
} catch (Exception e) {
continue;
}
}
}
m_servers.add(new LocalServerController(server));
}
}
@ -171,19 +228,19 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
}
private boolean canRunHere(AdempiereServer server, AdempiereProcessor model) {
return AdempiereServer.isOKtoRunOnIP(model)
&& ClusterServerMgr.getInstance().getServerInstanceAtOtherMembers(server.getServerID())==null;
return AdempiereServer.isOKtoRunOnIP(model);
}
/**
* @param scheduler
* @return true
* @return error
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public boolean addScheduler(MScheduler scheduler) {
@Override
public String addScheduler(MScheduler scheduler) {
String serverId = scheduler.getServerID();
if (getServerInstance(serverId) != null)
return false;
return null;
//osgi server
List<IServerFactory> serverFactoryList = Service.locator().list(IServerFactory.class).getServices();
@ -194,14 +251,78 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
if (factory.getProcessorClass().getName().equals(scheduler.getClass().getName())) {
AdempiereServer server = factory.create(m_ctx, scheduler);
if (server != null && canRunHere(server, scheduler)) {
m_servers.add(new LocalServerController(server));
return start(serverId)==null;
if (getServerInstance(scheduler.getServerID()) == null) {
String clusterId = getClusterMemberId();
if (clusterId != null) {
Map<String, String> map = getServerOwnerMap();
if (map != null) {
ICacheService cacheService = getCacheService();
try {
if (cacheService.tryLock(map, server.getServerID(), 30, TimeUnit.SECONDS)) {
try {
String memberId = map.get(server.getServerID());
if (memberId != null && !memberId.equals(clusterId)) {
continue;
} else if (memberId == null) {
map.put(server.getServerID(), clusterId);
}
} finally {
cacheService.unLock(map, server.getServerID());
}
} else {
continue;
}
} catch (Exception e) {
continue;
}
}
}
m_servers.add(new LocalServerController(server, false));
return start(serverId);
}
}
}
}
}
return false;
return null;
}
@Override
public synchronized String removeScheduler(MScheduler scheduler) {
String serverId = scheduler.getServerID();
LocalServerController serverController = getLocalServerController(serverId);
if (serverController == null)
return null;
if (serverController.isAlive()) {
String error = stop(serverId);
if (error != null) {
return error;
}
}
for (int i = 0; i < m_servers.size(); i++) {
serverController = m_servers.get(i);
if (serverId.equals(serverController.server.getServerID())) {
m_servers.remove(i);
String clusterId = getClusterMemberId();
if (clusterId != null) {
Map<String, String> map = getServerOwnerMap();
if (map != null) {
String ownerId = map.get(serverId);
if (ownerId != null && ownerId.equals(clusterId)) {
map.remove(serverId);
}
}
}
return null;
}
}
return null;
}
/**
@ -473,7 +594,7 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
@Override
public ServerInstance[] getServerInstances() {
public synchronized ServerInstance[] getServerInstances() {
List<ServerInstance> responses = new ArrayList<>();
LocalServerController[] controllers = getLocalServerControllers();
for (LocalServerController controller : controllers) {
@ -590,9 +711,14 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
protected AdempiereServer server;
protected volatile ScheduledFuture<?> scheduleFuture;
public LocalServerController(AdempiereServer server) {
private LocalServerController(AdempiereServer server) {
this(server, true);
}
private LocalServerController(AdempiereServer server, boolean start) {
this.server = server;
start();
if (start)
start();
}
public void start() {
@ -713,4 +839,34 @@ public class AdempiereServerMgr implements ServiceTrackerCustomizer<IServerFacto
return null;
}
private IClusterService getClusterService() {
IServiceHolder<IClusterService> holder = Service.locator().locate(IClusterService.class);
IClusterService service = holder != null ? holder.getService() : null;
return service;
}
private String getClusterMemberId() {
IClusterService service = getClusterService();
if (service != null) {
IClusterMember local = service.getLocalMember();
if (local != null)
return local.getId();
}
return null;
}
private ICacheService getCacheService( ) {
IServiceHolder<ICacheService> holder = Service.locator().locate(ICacheService.class);
ICacheService service = holder != null ? holder.getService() : null;
return service;
}
private Map<String, String> getServerOwnerMap() {
ICacheService service = getCacheService();
if (service != null) {
return service.getMap("cluster.server.owner.map");
}
return null;
}
} // AdempiereServerMgr

View File

@ -27,6 +27,8 @@ package org.compiere.server;
import java.sql.Timestamp;
import org.compiere.model.MScheduler;
/**
*
* @author hengsin
@ -118,4 +120,16 @@ public interface IServerManager {
*/
public String getDescription();
/**
* @param scheduler
* @return error
*/
public String addScheduler(MScheduler scheduler);
/**
*
* @param scheduler
* @return error
*/
public String removeScheduler(MScheduler scheduler);
}

View File

@ -39,16 +39,19 @@ import java.util.concurrent.Future;
import org.adempiere.base.IServiceHolder;
import org.adempiere.base.Service;
import org.compiere.Adempiere;
import org.compiere.model.MScheduler;
import org.compiere.server.IServerManager;
import org.compiere.server.ServerCount;
import org.compiere.server.ServerInstance;
import org.idempiere.distributed.IClusterMember;
import org.idempiere.distributed.IClusterService;
import org.idempiere.server.cluster.callable.AddSchedulerCallable;
import org.idempiere.server.cluster.callable.GetAllCallable;
import org.idempiere.server.cluster.callable.GetServerCallable;
import org.idempiere.server.cluster.callable.GetServerCountCallable;
import org.idempiere.server.cluster.callable.GetStartTimeCallable;
import org.idempiere.server.cluster.callable.ReloadCallable;
import org.idempiere.server.cluster.callable.RemoveSchedulerCallable;
import org.idempiere.server.cluster.callable.Response;
import org.idempiere.server.cluster.callable.RunNowCallable;
import org.idempiere.server.cluster.callable.StartAllCallable;
@ -374,48 +377,59 @@ public class ClusterServerMgr implements IServerManager {
}
}
/**
* find server instance from non-local nodes
* @param serverId
* @return ServerInstance
*/
public ServerInstance getServerInstanceAtOtherMembers(String serverId) {
@Override
public String addScheduler(MScheduler scheduler) {
IClusterService service = getClusterService();
if (service == null)
return null;
return "Cluster service not available";
GetServerCallable callable = new GetServerCallable(serverId);
Collection<IClusterMember> members = service.getMembers();
if (members == null || members.isEmpty())
return null;
final IClusterMember local = service.getLocalMember();
if (local == null)
return null;
List<IClusterMember> others = new ArrayList<>();
members.forEach(e -> {
if (!e.getId().equals(local.getId())) {
others.add(e);
}
});
if (others.size() > 0) {
Map<IClusterMember, Future<ServerInstance>> futureMap = service.execute(callable, others);
if (futureMap != null) {
try {
Set<Entry<IClusterMember, Future<ServerInstance>>> results = futureMap.entrySet();
for(Entry<IClusterMember, Future<ServerInstance>> f : results) {
ServerInstance i = f.getValue().get();
if (i != null) {
i.setClusterMember(f.getKey());
return i;
}
AddSchedulerCallable callable = new AddSchedulerCallable(scheduler);
Map<IClusterMember, Future<Response>> futureMap = service.execute(callable, service.getMembers());
if (futureMap != null) {
try {
Collection<Future<Response>> results = futureMap.values();
for(Future<Response> f : results) {
Response response = f.get();
if (response != null && response.getServerId() != null) {
return response.getError();
}
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
return null;
} else {
return "Failed to send add scheduler request through cluster servie";
}
}
@Override
public String removeScheduler(MScheduler scheduler) {
IClusterService service = getClusterService();
if (service == null)
return "Cluster service not available";
RemoveSchedulerCallable callable = new RemoveSchedulerCallable(scheduler);
Map<IClusterMember, Future<Response>> futureMap = service.execute(callable, service.getMembers());
if (futureMap != null) {
try {
Collection<Future<Response>> results = futureMap.values();
for(Future<Response> f : results) {
Response response = f.get();
if (response != null && response.getServerId() != null) {
return response.getError();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
return null;
} else {
return "Failed to send remove scheduler request through cluster servie";
}
return null;
}
}

View File

@ -0,0 +1,73 @@
/**********************************************************************
* This file is part of iDempiere ERP Open Source *
* http://www.idempiere.org *
* *
* Copyright (C) Contributors *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software *
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, *
* MA 02110-1301, USA. *
* *
* Contributors: *
* - Trek Global Corporation *
* - Heng Sin Low *
**********************************************************************/
package org.idempiere.server.cluster.callable;
import java.io.Serializable;
import java.util.concurrent.Callable;
import org.compiere.model.MScheduler;
import org.compiere.server.AdempiereServerMgr;
import org.compiere.server.IServerManager;
/**
* @author hengsin
*
*/
public class AddSchedulerCallable implements Callable<Response>, Serializable {
/**
* generated serial id
*/
private static final long serialVersionUID = -9074664311726118178L;
private MScheduler scheduler;
/**
* @param serverId
*/
public AddSchedulerCallable(MScheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Response call() throws Exception {
Response response = new Response();
IServerManager serverMgr = AdempiereServerMgr.get(false);
if (serverMgr != null) {
String error = serverMgr.addScheduler(scheduler);
if (error != null) {
response.error = error;
response.serverId = scheduler.getServerID();
} else {
if (serverMgr.getServerInstance(scheduler.getServerID()) != null) {
response.error = null;
response.serverId = scheduler.getServerID();
}
}
}
return response;
}
}

View File

@ -0,0 +1,67 @@
/**********************************************************************
* This file is part of iDempiere ERP Open Source *
* http://www.idempiere.org *
* *
* Copyright (C) Contributors *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software *
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, *
* MA 02110-1301, USA. *
* *
* Contributors: *
* - Trek Global Corporation *
* - Heng Sin Low *
**********************************************************************/
package org.idempiere.server.cluster.callable;
import java.io.Serializable;
import java.util.concurrent.Callable;
import org.compiere.model.MScheduler;
import org.compiere.server.AdempiereServerMgr;
import org.compiere.server.IServerManager;
/**
* @author hengsin
*
*/
public class RemoveSchedulerCallable implements Callable<Response>, Serializable {
/**
* generated serial id
*/
private static final long serialVersionUID = 7929697664683268446L;
private MScheduler scheduler;
/**
* @param serverId
*/
public RemoveSchedulerCallable(MScheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Response call() throws Exception {
Response response = new Response();
IServerManager serverMgr = AdempiereServerMgr.get(false);
if (serverMgr != null) {
if (serverMgr.getServerInstance(scheduler.getServerID()) != null) {
response.error = serverMgr.removeScheduler(scheduler);
response.serverId = scheduler.getServerID();
}
}
return response;
}
}

View File

@ -16,9 +16,12 @@ package org.idempiere.hazelcast.service;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.idempiere.distributed.ICacheService;
import com.hazelcast.core.IMap;
/**
* @author hengsin
*
@ -58,4 +61,21 @@ public class CacheServiceImpl implements ICacheService {
return null;
}
@Override
public <K, V> boolean tryLock(Map<K, V> map, K key, long timeout, TimeUnit timeunit) throws InterruptedException {
if (map instanceof IMap<?, ?>) {
IMap<K, V> imap = (IMap<K, V>) map;
return imap.tryLock(key, timeout, timeunit);
}
return false;
}
@Override
public <K, V> void unLock(Map<K, V> map, K key) {
if (map instanceof IMap<?, ?>) {
IMap<K, V> imap = (IMap<K, V>) map;
imap.unlock(key);
}
}
}