AnsweredAssumed Answered

Jobs and Clusters

Question asked by yblazart on Jul 18, 2012
Latest reply on Jul 18, 2012 by yblazart
Hi ! I want to install multiple instance of activity, sharing the same database.

In my workflows, I will have some timers for example. I made some test.
As I read, it's seems that JobExecutor do a 'lock' in the database for each job to execute, so this ensure that the job executor on the other server will not attempt to execute it (AcquireJobsCmd,AcquireJobsRunnable,JobExecutor). Ok that's great.

But in my tests, I see a lot of stacktraces like this :

18 juil. 2012 12:07:01 org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable run
GRAVE: exception during job acquisition: TimerEntity[246a2666-d0c0-11e1-8fa6-001b63a220cd] was updated by another transaction concurrently
org.activiti.engine.ActivitiOptimisticLockingException: TimerEntity[246a2666-d0c0-11e1-8fa6-001b63a220cd] was updated by another transaction concurrently
   at org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:452)
   at org.activiti.engine.impl.db.DbSqlSession.flush(DbSqlSession.java:348)
   at org.activiti.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:149)
   at org.activiti.engine.impl.interceptor.CommandContext.close(CommandContext.java:105)
   at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:49)
   at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33)
   at org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable.run(AcquireJobsRunnable.java:57)
   at java.lang.Thread.run(Thread.java:680)

For me, this ActivitiOptimisticLockingException is not an error (Sever or not) in this case, we should not see a such thing in logs.

Have you plan something to this, or can I propose a patch ? The code would be located in AcquireJobsRunnable line :



public class AcquireJobsRunnable implements Runnable {

    private static Logger log = Logger.getLogger(AcquireJobsRunnable.class.getName());
    protected final JobExecutor jobExecutor;
    protected volatile boolean isInterrupted = false;
    protected volatile boolean isJobAdded = false;
    protected final Object MONITOR = new Object();
    protected final AtomicBoolean isWaiting = new AtomicBoolean(false);

    public AcquireJobsRunnable(JobExecutor jobExecutor) {
        this.jobExecutor = jobExecutor;
    }

    public synchronized void run() {
        log.info(jobExecutor.getName() + " starting to acquire jobs");

        final CommandExecutor commandExecutor = jobExecutor.getCommandExecutor();
        long millisToWait = 0;
        float waitIncreaseFactor = 2;
        long maxWait = 60 * 1000;

        while (!isInterrupted) {
            int maxJobsPerAcquisition = jobExecutor.getMaxJobsPerAcquisition();

            try {
                AcquiredJobs acquiredJobs = null;
                boolean jobsLockAcquired;
                try {
                    acquiredJobs = commandExecutor.execute(jobExecutor.getAcquireJobsCmd());
                    jobsLockAcquired = true;
                } catch (ActivitiOptimisticLockingException lockException) {
                    // lock can't be acquired,
                    jobsLockAcquired = false;
                    log.log(Level.INFO,"some Jobs are already locked");
                }
                if (jobsLockAcquired) {
                    for (List<String> jobIds : acquiredJobs.getJobIdBatches()) {
                        jobExecutor.executeJobs(jobIds);
                    }

                    // if all jobs were executed
                    millisToWait = jobExecutor.getWaitTimeInMillis();
                    int jobsAcquired = acquiredJobs.getJobIdBatches().size();
                    if (jobsAcquired < maxJobsPerAcquisition) {

                        isJobAdded = false;

                        // check if the next timer should fire before the normal sleep time is over
                        Date duedate = new Date(ClockUtil.getCurrentTime().getTime() + millisToWait);
                        List<TimerEntity> nextTimers = commandExecutor.execute(new GetUnlockedTimersByDuedateCmd(duedate, new Page(0, 1)));

                        if (!nextTimers.isEmpty()) {
                            long millisTillNextTimer = nextTimers.get(0).getDuedate().getTime() - ClockUtil.getCurrentTime().getTime();
                            if (millisTillNextTimer < millisToWait) {
                                millisToWait = millisTillNextTimer;
                            }
                        }

                    } else {
                        millisToWait = 0;
                    }
                }
            } catch (Exception e) {
                log.log(Level.SEVERE, "exception during job acquisition: " + e.getMessage(), e);
                millisToWait *= waitIncreaseFactor;
                if (millisToWait > maxWait) {
                    millisToWait = maxWait;
                } else if (millisToWait == 0) {
                    millisToWait = jobExecutor.getWaitTimeInMillis();
                }
            }

            if ((millisToWait > 0) && (!isJobAdded)) {
                try {
                    log.fine("job acquisition thread sleeping for " + millisToWait + " millis");
                    synchronized (MONITOR) {
                        if (!isInterrupted) {
                            isWaiting.set(true);
                            MONITOR.wait(millisToWait);
                        }
                    }
                    log.fine("job acquisition thread woke up");
                } catch (InterruptedException e) {
                    log.fine("job acquisition wait interrupted");
                } finally {
                    isWaiting.set(false);
                }
            }
        }
        log.info(jobExecutor.getName() + " stopped job acquisition");
    }

    public void stop() {
        synchronized (MONITOR) {
            isInterrupted = true;
            if (isWaiting.compareAndSet(true, false)) {
                MONITOR.notifyAll();
            }
        }
    }

    public void jobWasAdded() {
        isJobAdded = true;
        if (isWaiting.compareAndSet(true, false)) {
            // ensures we only notify once
            // I am OK with the race condition     
            synchronized (MONITOR) {
                MONITOR.notifyAll();
            }
        }
    }
}

Outcomes