AnsweredAssumed Answered

Parallel getway fork can't execute task service concurrently

Question asked by craig_wu9 on Jul 24, 2012
Latest reply on Jul 25, 2012 by ronald.van.kuijk
I got tow task service named task1 and task2, both with activiti:async="true" set.
And I have active the JobExecutor at activiti.cfg.xml. But the result is the two tasks executed sequence.
It will print task1:1 to task1:49 and pause, then continue to print task1: 50 to task1: 100. After that start to print task2:0 to task2:10.

I have checked the source code of Activiti Engine with debug. And found in my case, at DefaultJobExecutor 's blew method
  public void executeJobs(List<String> jobIds) {
    try {
         threadPoolExecutor.execute(new ExecuteJobsRunnable(this, jobIds));
    }catch (RejectedExecutionException e) {
      rejectedJobsHandler.jobsRejected(this, jobIds);
    }
  }
The paramenter jobIds has tow data, and I checked the db, the tow job ids just represent the task1 and task2. So in my opinion, the tow tasks are executed in one process, Is that true?

Or same thing mistake I have made?


my activiti.cfg.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans   http://www.springframework.org/schema/beans/spring-beans.xsd">
  <bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
    <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/act2" />
    <property name="jdbcDriver" value="com.mysql.jdbc.Driver" />
    <property name="jdbcUsername" value="act" />
    <property name="jdbcPassword" value="act" />
   
    <!– Database configurations –>
    <property name="databaseSchemaUpdate" value="true" />
   
    <!– job executor configurations –>
    <property name="jobExecutorActivate" value="true" />
   
    <!– mail server configurations –>
    <property name="mailServerPort" value="5025" />   
  </bean>

</beans>

my bpmn xml file
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
  xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
  xmlns:activiti="http://activiti.org/bpmn
  targetNamespace="Examples">

  <process id="test">

    <startEvent id="start" />
   
    <parallelGateway id="fork" />
   
    <sequenceFlow id="flow1" sourceRef="start" targetRef="fork" />
   
    <serviceTask id="task1" name="Task1" activiti:async="true" activiti:class="psn.wxf.test.paralleltest.Task1" />
    <serviceTask id="task2" name="Task2" activiti:async="true" activiti:class="psn.wxf.test.paralleltest.Task2" />
   
   
    <sequenceFlow id="flow2" sourceRef="fork" targetRef="task1" />
    <sequenceFlow id="flow3" sourceRef="fork" targetRef="task2" />
   
    <parallelGateway id="join" />
   
    <sequenceFlow id="flow4" sourceRef="task1" targetRef="join" />
    <sequenceFlow id="flow5" sourceRef="task2" targetRef="join" />
   
    <serviceTask id="task3" name="Task3" activiti:class="psn.wxf.test.paralleltest.Task3" />
   
    <sequenceFlow id="flow7" sourceRef="join" targetRef="task3" />
   
    <sequenceFlow id="flow8" sourceRef="task3" targetRef="theEnd" />

    <endEvent id="theEnd" />

  </process>

</definitions>

my Task1 java class
public class Task1 implements JavaDelegate {

   @Override
   public void execute(DelegateExecution execution) throws Exception {
      for(int i=0;i<100;i++) {
         if (i==50) Thread.sleep(1000);
         System.out.println("Task1:"+i);
      }
   }

}
my task2 java class
public class Task2 implements JavaDelegate {

   @Override
   public void execute(DelegateExecution execution) throws Exception {
      for(int j=0;j<10;j++) {
         if (j==5) {
            j=5;
         }
         System.out.println("Task2:"+j);
      }
   }

}

my main java class, with bpmn file deployed before.
public class EngineTest {
   public static void main(String[] args) {
      ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
      RepositoryService repositoryService = processEngine
            .getRepositoryService();
      RuntimeService runtimeService = processEngine.getRuntimeService();
      runtimeService.startProcessInstanceByKey("test");
      processEngine.close();
   }
}

Outcomes