如何实现一个带状态机的工作流引擎?

wen java案例 59

本文目录导读:

如何实现一个带状态机的工作流引擎?

  1. 第一部分:核心概念与设计
  2. 第二部分:架构分层
  3. 第三部分:代码实现步骤(Java 示例)
  4. 第四部分:注意事项与优化
  5. 第五部分:总结与扩展

实现一个带状态机的工作流引擎,核心在于将业务流程抽象为状态转换,每个任务或整个流程实例都处于一个明确的状态,只有满足特定条件(事件)时,才能通过执行特定动作(Action)转移到下一个状态。

以下是一个从设计到代码实现(以 Java 为例)的完整指南。

第一部分:核心概念与设计

  1. 状态 (State):流程或节点所处的稳定阶段。待提交审批中已通过已驳回
  2. 事件 (Event):触发状态转换的外部或内部动作。提交申请审批通过驳回
  3. 转换 (Transition):定义了从源状态到目标状态的路径,以及触发的条件。
    • 公式:源状态 + 事件 + 条件(可选) = 目标状态 + 执行动作
  4. 动作 (Action):状态转换时执行的业务逻辑,发送邮件、更新数据库、调用第三方 API。
  5. 上下文 (Context):贯穿整个流程的数据载体,例如申请表、审批意见、用户信息。

第二部分:架构分层

为了保持可维护性和灵活性,建议分层设计:

层次 职责 关键组件
定义层 定义工作流的元数据(状态、事件、转换规则)。 StateMachineDefinitionState, Event
引擎层 核心调度逻辑,解析规则,执行转换。 StateMachineEngineStateMachineExecutor
持久层 存储工作流实例状态、快照、历史记录。 WorkflowInstanceTransitionHistory
业务层 实现具体的 Action,绑定到流程节点。 ActionHandler 接口实现, ConditionEvaluator

第三部分:代码实现步骤(Java 示例)

我们将使用 Spring Boot + JPA 作为基础框架,并避免引入重型外部依赖(如 Activiti),从零搭建。

Step 1:定义枚举与接口

// 1. 定义状态和事件枚举(根据业务调整)
public enum OrderState {
    DRAFT, PENDING_APPROVAL, APPROVED, REJECTED, CLOSED
}
public enum OrderEvent {
    SUBMIT, APPROVE, REJECT, CLOSE
}
// 2. 定义 Action 接口
public interface StateAction {
    void execute(WorkflowInstance instance, Object payload);
}
// 3. 定义条件接口(决定是否能走某条路径)
public interface ConditionEvaluator {
    boolean evaluate(WorkflowInstance instance, OrderEvent event);
}

Step 2:构建状态机定义(DSL 风格)

这是最核心的部分,使用 Builder 模式构建转换图谱。

import java.util.*;
public class StateMachineDefinition<S, E> {
    // 核心存储:源状态 -> (事件 -> 包含目标状态、动作、条件的对象)
    private final Map<S, Map<E, TransitionConfig<S>>> transitions = new HashMap<>();
    // 内部类:配置一条转换
    public static class TransitionConfig<S> {
        private final S targetState;
        private StateAction action;
        private ConditionEvaluator condition;
        public TransitionConfig(S targetState) {
            this.targetState = targetState;
        }
        // getters ...
    }
    // Builder 模式
    public static class Builder<S, E> {
        private StateMachineDefinition<S, E> definition = new StateMachineDefinition<>();
        public Builder<S, E> from(S source) {
            return new BuilderStage<>(this, source);
        }
        public StateMachineDefinition<S, E> build() {
            return definition;
        }
    }
    // 链式调用的中间阶段
    public static class BuilderStage<S, E> {
        private final Builder<S, E> builder;
        private final S source;
        public BuilderStage(Builder<S, E> builder, S source) {
            this.builder = builder;
            this.source = source;
        }
        public OnStage<S, E> on(E event) { return new OnStage<>(builder, source, event); }
    }
    public static class OnStage<S, E> {
        // ... 省略实现细节,最终调用 definition.addTransition(source, event, target, action, condition)
    }
    // 添加一条转换规则
    public void addTransition(S source, E event, S target, StateAction action) {
        transitions.computeIfAbsent(source, k -> new HashMap<>())
                   .put(event, new TransitionConfig<>(target).setAction(action));
    }
    // 获取转换配置
    public TransitionConfig<S> getTransition(S source, E event) {
        return transitions.getOrDefault(source, new HashMap<>()).get(event);
    }
}

Step 3:创建持久化实体

@Entity
@Table(name = "workflow_instance")
public class WorkflowInstance {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String bizId;         // 业务单据 ID(如订单号)
    private String currentState;  // 当前状态
    private String machineName;   // 状态机名称(用于区分不同流程)
    @OneToMany(mappedBy = "instance")
    private List<TransitionHistory> histories;
    // getters, setters ...
}
@Entity
@Table(name = "transition_history")
public class TransitionHistory {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String fromState;
    private String toState;
    private String event;
    private String operator;
    private LocalDateTime createdAt;
    @ManyToOne
    private WorkflowInstance instance;
    // getters, setters ...
}

Step 4:核心引擎实现

@Service
public class StateMachineEngine {
    // 假设 machineRegistry 维护了各种流程的定义
    private final Map<String, StateMachineDefinition<OrderState, OrderEvent>> machineRegistry = new HashMap<>();
    @Autowired
    private WorkflowInstanceRepository instanceRepo;
    // 1. 触发事件
    public WorkflowInstance fireEvent(String instanceId, OrderEvent event, Object payload) {
        WorkflowInstance instance = instanceRepo.findById(instanceId)
                .orElseThrow(() -> new RuntimeException("Instance not found"));
        StateMachineDefinition<OrderState, OrderEvent> machine = machineRegistry.get(instance.getMachineName());
        OrderState currentState = OrderState.valueOf(instance.getCurrentState());
        // 2. 获取转换配置
        TransitionConfig<OrderState> config = machine.getTransition(currentState, event);
        if (config == null) {
            throw new IllegalStateException(
                String.format("No transition from state [%s] on event [%s]", currentState, event));
        }
        // 3. 执行条件检查
        if (config.getCondition() != null && !config.getCondition().evaluate(instance, event)) {
            throw new IllegalStateException("Condition not met for transition");
        }
        // 4. **执行状态转换(关键操作)**
        OrderState targetState = config.getTargetState();
        instance.setCurrentState(targetState.name());
        // 5. 执行绑定的 Action
        if (config.getAction() != null) {
            config.getAction().execute(instance, payload);
        }
        // 6. 持久化(注意并发安全)
        saveWithHistory(instance, currentState.name(), targetState.name(), event.name());
        return instance;
    }
    // 使用 @Transactional 保证原子性,并记录历史
    @Transactional
    protected void saveWithHistory(WorkflowInstance instance, String from, String to, String event) {
        TransitionHistory history = new TransitionHistory();
        history.setFromState(from);
        history.setToState(to);
        history.setEvent(event);
        history.setInstance(instance);
        history.setCreatedAt(LocalDateTime.now());
        instance.getHistories().add(history);
        instanceRepo.save(instance);
    }
}

Step 5:定义具体流程与业务 Action

@Component
public class OrderWorkflowConfigurer {
    @PostConstruct
    public void configure() {
        StateMachineDefinition<OrderState, OrderEvent> machine = 
            new StateMachineDefinition.Builder<OrderState, OrderEvent>()
                .from(OrderState.DRAFT)
                    .on(OrderEvent.SUBMIT)
                    .to(OrderState.PENDING_APPROVAL)
                    .withAction(new SubmitAction())   // 提交后执行的动作
                .from(OrderState.PENDING_APPROVAL)
                    .on(OrderEvent.APPROVE)
                    .to(OrderState.APPROVED)
                    .withAction(new NotifyAction())   // 审批通过后通知
                .from(OrderState.PENDING_APPROVAL)
                    .on(OrderEvent.REJECT)
                    .to(OrderState.DRAFT)             // 驳回回到草稿
                .from(OrderState.APPROVED)
                    .on(OrderEvent.CLOSE)
                    .to(OrderState.CLOSED)
                .build();
        // 注册到引擎
        StateMachineRegistry.register("ORDER_FLOW", machine);
    }
}
// 具体 Action 实现
@Component
public class SubmitAction implements StateAction {
    @Override
    public void execute(WorkflowInstance instance, Object payload) {
        System.out.println("订单 " + instance.getBizId() + " 已提交,执行库存预占...");
        // 实际调用库存服务
    }
}

第四部分:注意事项与优化

  1. 并发安全

    • 使用 @Version(乐观锁)或 SELECT ... FOR UPDATE
    • 关键路径:先查询当前状态,再尝试更新,如果状态已被其他线程修改,应抛出异常或重试。
  2. 幂等性

    • 事件处理函数应该可重入,可以通过历史表(TransitionHistory)引入事件唯一 ID,检测重复事件。
  3. 可观测性

    • 记录详细的状态迁移日志、耗时、失败原因。
    • 提供 API 查询当前状态、下一步可执行的事件。
  4. 流程复杂度

    • 子状态机:当节点内部有复杂逻辑时,可以使用子状态机(状态嵌套)。
    • 并行分支:使用 Fork / Join 节点,需要引入令牌(Token)机制。
  5. 性能优化

    • 缓存状态机定义(通常不会变)。
    • 使用异步 Action 执行器,避免阻塞主流程。

第五部分:总结与扩展

场景 实现思路
简单流程(订单、审批流) 上述基于内存的状态机 + JPA 持久化即可,清晰高效。
复杂流程(BPMN 2.0 标准) 建议使用成熟工具库,如 Activiti / Flowable,它们原生支持了并行网关、子流程、表达式脚本(Groovy)等。
分布式工作流 关键点在于 分布式事务,Action 调用了外部服务,需要设计补偿回滚(Saga模式)重试幂等,此时状态机模型应结合事件驱动架构(如 Kafka + CQRS)。

最终建议:如果你的目标是构建一个健壮的生产级工作流,并且团队有精力维护,从零实现状态机引擎可以带来极大的灵活性,但如果业务复杂度高(如需要可视化设计器、支持 BPEL/BPMN 标准),直接基于 FlowableCamunda 进行二次开发会是更高效的选择。

抱歉,评论功能暂时关闭!