AnsweredAssumed Answered

Correct way to signal a receiveTask

Question asked by exp2345 on Aug 19, 2014
Latest reply on Aug 21, 2014 by trademak
I am attempting to signal a receive task from a delegate Java thread started from a Service Task and have run into two issues with the implementation.

I have already read the threads http://forums.activiti.org/content/error-while-signaling-activity-receive-task and http://forums.activiti.org/content/signalling-process-fails-intermittently and my sample code is much simpler (no JMS) but the scenario is similar.

Two approaches tried are:

POLLING

Repeatedly calling RuntimeService.signal(executionId) until it succeeds. It works, but I'm not happy with the robustness of this solution.

RuntimeService.signal() throws a NullPointerException instead of an ActivitiException. I'm not sure if this is a bug or expected behaviour.

EVENT LISTENER

I expect that registering an event listener on ACTIVITI_STARTED before leaving the Service Task will allow me to wait until the receive state has started before starting the delegate thread. When I query for the Execution using the executionId from ActivitiEvent RuntimeService returns a null execution which prevents me from verifying that the correct task has started. Is this expected behaviour?

SERVICE TASK


import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngines;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventListener;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.impl.bpmn.behavior.TaskActivityBehavior;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.runtime.Execution;
import org.activiti.engine.runtime.ExecutionQuery;

public class ThreadDelegate extends TaskActivityBehavior {

    @Override
    public void execute(ActivityExecution execution) throws Exception {

        System.out.println("Started thread delegate for execution id " + execution.getId());

        Object counterObj = execution.getVariable("counter");
        if(counterObj == null) {
            execution.setVariable("counter", 0);
            System.out.println("Counter initialised at 0");
        } else {
            int counter = (int) counterObj;
            counter++;
            execution.setVariable("counter", counter);
            System.out.println("Counter incremented to " + counter);
        }
        final String executionId = execution.getId();

        final Thread t = new Thread() {
            @Override
            public void run() {
                System.out.println("Thread running");

//                Forcing the thread to sleep causes the process the execute normally
//                try {
//                    Thread.sleep(5000);
//                } catch (InterruptedException e) { }

                signalWaitState();

                /* KLUDGE alert!
                 * This does work but I hope to achieve signalling without a polling loop.
                 */
                /*
                int retries = 0;
                while(retries < 5) {
                    try {
                        signalWaitState();
                        return;
                    } catch (NullPointerException e) {
                        retries++;
                        System.out.println("Waiting before retry " + retries);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e2) { }
                    }
                }
                */
            }

            /*
              * Sometimes a NPE exception is thrown
              *     at org.activiti.engine.impl.persistence.entity.ExecutionEntity.signal(ExecutionEntity.java:375)
              * unless the thread is forced to sleep first. Presumably this is because the wait state has not been
              * executed yet.
              */
            private void signalWaitState() {
                ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
                RuntimeService runtimeService = processEngine.getRuntimeService();
                runtimeService.signal(executionId);
                System.out.println("Signalled exeuection id " + executionId);
            }
        };

        /*
         * The purpose of this event listener was to ensure that the waitState has been
         * entered before starting the delegate thread. This was intended to prevent the
         * intermittent NPE when called signal() when delegate thread completes execution.
         *
         * Even though the executionId is valid the query returns a null execution.
         */


        ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
        RuntimeService runtimeService = processEngine.getRuntimeService();
        runtimeService.addEventListener(new ActivitiEventListener() {
            @Override
            public void onEvent(ActivitiEvent event) {
                RuntimeService runtimeService = event.getEngineServices().getRuntimeService();
                System.out.println("Event type: " + event.getType() + " executionId: " + event.getExecutionId() + " piid: " + event.getProcessInstanceId());
                // TODO: filter on executionId
                try {
                    ExecutionQuery q = runtimeService.createExecutionQuery();
                    q.executionId(event.getExecutionId());
                    Execution execution = q.singleResult();
                    String activityId = execution.getActivityId();
                    System.out.println("Activity ID: " + activityId);
                    if("waitState".equals(activityId)) {
                        t.start();
                    }
                    System.out.println("Started delegate thread.");
                } finally {
                    runtimeService.removeEventListener(this);
                }
            }

            @Override
            public boolean isFailOnException() {
                return true;
            }
        });


        this.leave(execution);
        System.out.println("Left service task");

        // Start the thread here unless using the event listener above
//        t.start();
        System.out.println("Started delegate thread.");
    }
}


BPMN

<blockcode>
<process id="simpleThreaded" name="Simple Threaded" isExecutable="true">
    <startEvent id="sid-E3A6E22C-879A-445A-A41E-54D32380A289"/>
   
    <serviceTask id="threadDelegate" activiti:class="ThreadDelegate"/>
   
    <receiveTask id="waitState"/>
   
   <exclusiveGateway id="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194"/>

    <intermediateCatchEvent id="sid-8BD44534-5684-4B80-AA0B-7FA1E4C2F7A5">
      <timerEventDefinition>
        <timeDuration>PT5S</timeDuration>
      </timerEventDefinition>
    </intermediateCatchEvent>
   
   <endEvent id="sid-3E6882EB-427F-42B9-AB41-973B95024CBE"/>

    <sequenceFlow id="sid-E62C4877-7B6B-4BAE-9708-89540EC67DE8" sourceRef="sid-8BD44534-5684-4B80-AA0B-7FA1E4C2F7A5" targetRef="threadDelegate"/>
    <sequenceFlow id="sid-C86D79E2-521F-4B97-8695-4EC1315D6232" sourceRef="waitState" targetRef="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194"/>
    <sequenceFlow id="sid-C3EB1FBA-316A-41FE-B29C-B9B6051B6C59" sourceRef="sid-E3A6E22C-879A-445A-A41E-54D32380A289" targetRef="threadDelegate"/>
    <sequenceFlow id="out" sourceRef="threadDelegate" targetRef="waitState"/>
    <sequenceFlow id="sid-43094329-90FD-4E8E-890F-2667044EBD2B" sourceRef="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194" targetRef="sid-3E6882EB-427F-42B9-AB41-973B95024CBE">
      <conditionExpression xsi:type="tFormalExpression"><![CDATA[${counter > 3}]]></conditionExpression>
    </sequenceFlow>
    <sequenceFlow id="sid-CE41E03C-2AB5-4B71-A04D-AD22ED8A06F0" sourceRef="sid-35B8B065-DCB0-44B1-B14B-DE14789F0194" targetRef="sid-8BD44534-5684-4B80-AA0B-7FA1E4C2F7A5">
      <conditionExpression xsi:type="tFormalExpression"><![CDATA[${counter <= 3}]]></conditionExpression>
    </sequenceFlow>
  </process>
</blockcode>

Outcomes