在Cadence/Temporal工作流中,最佳的信号处理方式/模式是什么?

5

当使用文档建议的信号时:

public class MyWorkflow{
   public Output myWorkflwMethod(Input input){
      ...
   }

   public void mySignalMethod(request){
     // do actual processing here. 
     ...
   }
}

我可能会遇到以下问题:

  1. 我想要保证同一信号名称内或跨所有信号名称的FIFO处理一次。
  2. 我想要处理signalWithStart中信号方法过早调用的竞争条件
  3. 我希望在重置工作流后能够安全地重新应用信号在历史记录中提前出现
  4. 我想要确保在处理信号之前工作流不会提前完成。
1个回答

4
  1. 保证按顺序一个接一个地进行FIFO处理
  2. 处理signalWithStart的“竞争条件”,其中signal方法调用得太早。或者在现实中,使用常规信号而没有signalWithStart时,在工作流准备好处理之前信号可能会过早到来。
  3. 安全地重置工作流。重置后,信号可以在历史记录中提前重新应用
  4. 确保在信号被处理之前工作流不会提前完成
  5. 为了避免竞争条件,可以使用相同的Queue队列存储所有信号,并使用instance of和casting

这些是在Cadence/Temporal工作流中使用信号时最常见的错误。

有一种设计模式可以应用于同时解决所有问题。

这个想法是简化信号处理程序,始终将信号放入队列中,工作流方法将启动另一个工作流线程来处理队列。

这是基于样例 (Cadence& Temporal) 的。

Java

public class MyWorkflow{
   private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 

   public void mySignalMethod(SignalRequest req){
       signalRequestQueue.add(req);
   }

   public Output myWorkflwMethod(Input input){
      //1. do everything necessary/needed before actually processing a signal
      ...

      //2. spin up a workflow thread to process 
      Async.procedure(
      () -> {
          while (true) {
              Workflow.await(() -> !signalRequestQueue.isEmpty());
              final SignalRequest request = signalRequestQueue.poll();
              processSignal(request);
          }
      });


      //3. always wait for queue to be empty before completing/failing/continueAsNew the workflow
      Workflow.await(() -> signalRequestQueue.isEmpty());
      return output
   }

   private void processSignal(request){
     // do your actual processing here. 
     // If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
     ...
   }
}

将现有代码迁移到这个模式

您应该使用版本控制进行迁移。

假设您已经有了像这样的现有代码;

public class MyWorkflow{
   public Output myWorkflwMethod(Input input){
      ...
   }

   public void mySignalMethod(request){
     // do your actual processing here. 
     ...
   }
}

然后你应该像下面这样使用版本控制:

public class MyWorkflow{
   private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 

   public void mySignalMethod(SignalRequest req){
       int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
       if( version == 1){
          signalRequestQueue.add(req);
       }else{
          processSignal(req);
       }
   }

   public Output myWorkflwMethod(Input input){
      //1. do everything necessary/needed before actually processing a signal
      ...

       int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
       if( version == 1){
         //2. spin up a workflow thread to process 
         Async.procedure(
         () -> {
             while (true) {
                 Workflow.await(() -> !signalRequestQueue.isEmpty());
                 final SignalRequest request = signalRequestQueue.poll();
                 processSignal(request);
             }
         });
       }

      //3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow
      Workflow.await(() -> signalRequestQueue.isEmpty());
      return output
   }

   private void processSignal(request){
     // do your actual processing here. 
     // If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
     ...
   }
}

Golang

Golang SDK没有1/2/3的同样问题。这是因为Golang SDK提供了一个完全不同的API来处理信号。

与其将信号方法定义为处理程序,Golang SDK要求工作流程监听通道以处理信号,这正是本答案建议在Java中执行的操作。请参见如何使用信号API的示例(请参见Cadence / Temporal)。

但它有第4个问题--工作流可能会在信号被处理之前提前完成。这是Golang SDK的常见错误。

建议始终在完成或 continueAsNew 工作流之前排空信号通道。 查看 Golang 中如何排空信号通道的示例
这类似于在 Java 中使用 Workflow.await 等待所有信号被处理。但是,由于该通道没有获取大小的 API,我们必须使用“默认”分支来检查是否为空。
感谢 @Maxim 指出 Temporal go sdk 中的 API -- 或者,可以使用 Temporal go-sdk 中的 "HasPending" API 来检查是否消耗了所有信号。
此外,建议监控 "unhandledSignal" metric

我想补充一下,Temporal Go SDK有Selector.HasPending方法,在大多数情况下可以消除显式排空信号通道的需要。 - Maxim Fateev
谢谢Maxim。我已将其添加到我的答案中。 - Long Quanzheng

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接