# AProcess **Repository Path**: qq2820/AProcess ## Basic Information - **Project Name**: AProcess - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-10-17 - **Last Updated**: 2025-10-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # C++ Process & Task 管理框架 一个基于XML配置的流程管理和动态Task插件框架,支持可视化流程编排和热插拔任务执行。 ## 项目概述 ### 核心特性 - **XML流程定义**: 通过XML文件定义业务流程结构和执行逻辑 - **动态Task管理**: 支持运行时动态加载和执行Task插件 - **多种网关支持**: 排他网关、包容网关、并行网关 - **并发执行**: 支持多个流程实例和任务的并行执行 - **事件监听**: 完整的流程和任务执行事件回调机制 - **热插拔**: 支持动态库的热加载和卸载 ### 架构设计 ``` ┌─────────────────────────────────────────┐ │ Application Layer │ │ (使用框架的应用程序) │ └─────────────┬───────────────────────────┘ │ ┌─────────────▼───────────────────────────┐ │ Process Engine │ │ - 流程定义管理 │ │ - 流程实例管理 │ │ - 流程执行调度 │ └─────────────┬───────────────────────────┘ │ ┌─────────────▼───────────────────────────┐ │ Process Manager │ │ - XML流程定义解析 │ │ - 流程实例创建 │ │ - 网关条件处理 │ └─────────────┬───────────────────────────┘ │ ┌─────────────▼───────────────────────────┐ │ Task Manager │ │ - 任务注册表 │ │ - 任务创建工厂 │ │ - 任务生命周期管理 │ └─────────────┬───────────────────────────┘ │ ┌─────────────▼───────────────────────────┐ │ Dynamic Library Loader │ │ - 动态库加载/卸载 │ │ - 符号解析 │ └─────────────┬───────────────────────────┘ │ ┌─────────────▼───────────────────────────┐ │ Task Plugins (*.so/*.dll) │ │ - 任务实现 │ │ - 导出工厂函数 │ └─────────────────────────────────────────┘ ``` ## 快速开始 ### 构建项目 ```bash # 创建构建目录 mkdir build && cd build # 配置CMake cmake .. # 编译 make # 运行测试 ./test_basic ./test_concurrent_tasks ``` ### 基本使用 ```cpp #include "process/ProcessManager.h" #include "process/ProcessEngine.h" #include "task/TaskManager.h" int main() { // 获取管理器实例 auto& processManager = ProcessManager::getInstance(); auto& processEngine = ProcessEngine::getInstance(); auto& taskManager = task::TaskManager::getInstance(); // 初始化引擎 processEngine.initialize(); // 加载任务插件 taskManager.loadLibrariesFromDirectory("./plugins"); // 加载流程定义 auto defId = processManager.loadProcessFromXML("./processes/my-process.xml"); // 创建流程实例 auto instanceId = processManager.createProcessInstance(defId); // 设置参数并启动流程 ProcessParams params; params.setInt("amount", 1000); params.setString("customerName", "John Doe"); processManager.startProcess(instanceId, params); // 等待流程完成 while (processManager.getProcessStatus(instanceId).state == ProcessState::RUNNING) { processEngine.update(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // 关闭引擎 processEngine.shutdown(); return 0; } ``` ## 设计Process流程 ### XML流程定义格式 Process通过XML文件定义,支持以下节点类型: - **开始节点** (`startEvent`): 流程起点 - **结束节点** (`endEvent`): 流程终点 - **任务节点** (`task`): 执行具体业务逻辑 - **排他网关** (`exclusiveGateway`): 基于条件选择一条路径 - **包容网关** (`inclusiveGateway`): 基于条件选择多条路径 - **并行网关** (`parallelGateway`): 无条件并行执行所有路径 ### 基本流程示例 ```xml 处理客户订单的业务流程 ``` ### 网关类型说明 #### 1. 排他网关 (Exclusive Gateway) - 基于条件选择**一条**路径执行 - 条件表达式格式: `${variable > value}` - 如果没有满足条件的路径,选择默认路径(无条件路径) #### 2. 包容网关 (Inclusive Gateway) - 基于条件选择**多条**路径执行 - 所有满足条件的路径都会并行执行 - 如果没有满足条件的路径,选择默认路径 #### 3. 并行网关 (Parallel Gateway) - **无条件**地并行执行**所有**出口路径 - 用于实现真正的并行任务执行 ### 并行流程示例 ```xml ``` ## 添加自定义Task ### Task基类接口 所有自定义Task必须继承 `ITask` 基类: ```cpp class ITask { public: virtual ~ITask() = default; // 基本信息 virtual const std::string& getName() const = 0; virtual const std::string& getVersion() const = 0; virtual const std::string& getDescription() const = 0; // 生命周期管理 virtual bool initialize(const TaskParams& params) = 0; virtual void cleanup() = 0; // 任务执行 virtual TaskResult execute(const TaskContext& context) = 0; virtual void cancel() = 0; // 状态查询 virtual TaskState getState() const = 0; virtual float getProgress() const = 0; virtual std::string getLastError() const = 0; }; ``` ### 创建自定义Task #### 方式1: 直接注册Task ```cpp #include "task/ITask.h" #include "task/TaskTypes.h" #include "Logger.h" class MyCustomTask : public task::ITask { private: std::string name_ = "MyCustomTask"; std::string version_ = "1.0.0"; std::string description_ = "我的自定义任务"; task::TaskState state_ = task::TaskState::IDLE; float progress_ = 0.0f; public: const std::string& getName() const override { return name_; } const std::string& getVersion() const override { return version_; } const std::string& getDescription() const override { return description_; } bool initialize(const task::TaskParams& params) override { state_ = task::TaskState::IDLE; LOG_INFO("[MyCustomTask] Initialized"); return true; } void cleanup() override { state_ = task::TaskState::IDLE; LOG_INFO("[MyCustomTask] Cleaned up"); } task::TaskResult execute(const task::TaskContext& context) override { state_ = task::TaskState::RUNNING; LOG_INFO("[MyCustomTask] Starting execution"); // 模拟任务执行 for (int i = 0; i <= 10; ++i) { if (context.cancelRequested) { state_ = task::TaskState::CANCELLED; return task::TaskResult::failureResult("Task cancelled", task::TaskErrorCode::CANCELLED); } progress_ = static_cast(i) / 10.0f; // 更新进度回调 if (context.progressCallback) { context.progressCallback(progress_); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } state_ = task::TaskState::COMPLETED; progress_ = 1.0f; task::TaskResult result; result.success = true; result.message = "MyCustomTask completed successfully"; result.outputData["execution_count"] = 1; result.outputData["custom_data"] = std::string("hello world"); LOG_INFO("[MyCustomTask] Execution completed"); return result; } void cancel() override { state_ = task::TaskState::CANCELLED; LOG_INFO("[MyCustomTask] Cancellation requested"); } task::TaskState getState() const override { return state_; } float getProgress() const override { return progress_; } std::string getLastError() const override { return ""; } }; // 注册Task auto& taskManager = task::TaskManager::getInstance(); taskManager.registerTask("MyCustomTask", []() { return std::make_shared(); }); ``` #### 方式2: 创建动态库Task插件 创建动态库项目,实现Task并导出标准接口: **MyTask.h** ```cpp #pragma once #include "task/ITask.h" #include "task/TaskTypes.h" class MyTask : public task::ITask { public: const std::string& getName() const override; const std::string& getVersion() const override; const std::string& getDescription() const override; bool initialize(const task::TaskParams& params) override; task::TaskResult execute(const task::TaskContext& context) override; void cleanup() override; void cancel() override; task::TaskState getState() const override; float getProgress() const override; std::string getLastError() const override; private: std::string name_ = "MyTask"; std::string version_ = "1.0.0"; std::string description_ = "My custom task from dynamic library"; task::TaskState state_ = task::TaskState::IDLE; float progress_ = 0.0f; bool cancelled_ = false; }; ``` **MyTask.cpp** ```cpp #include "MyTask.h" #include "task/TaskRegistrar.h" #include "Logger.h" const std::string& MyTask::getName() const { return name_; } const std::string& MyTask::getVersion() const { return version_; } const std::string& MyTask::getDescription() const { return description_; } bool MyTask::initialize(const task::TaskParams& params) { state_ = task::TaskState::IDLE; cancelled_ = false; LOG_INFO("[MyTask] Initialized"); return true; } void MyTask::cleanup() { state_ = task::TaskState::IDLE; LOG_INFO("[MyTask] Cleaned up"); } task::TaskResult MyTask::execute(const task::TaskContext& context) { state_ = task::TaskState::RUNNING; LOG_INFO("[MyTask] Starting execution"); // 处理输入参数 auto it = context.inputParams.stringParams.find("input_data"); if (it != context.inputParams.stringParams.end()) { LOG_INFO("[MyTask] Input data: ", it->second); } // 模拟任务执行 for (int i = 0; i <= 5; ++i) { if (cancelled_ || context.cancelRequested) { state_ = task::TaskState::CANCELLED; return task::TaskResult::failureResult("Task cancelled", task::TaskErrorCode::CANCELLED); } progress_ = static_cast(i) / 5.0f; if (context.progressCallback) { context.progressCallback(progress_); } std::this_thread::sleep_for(std::chrono::milliseconds(200)); } state_ = task::TaskState::COMPLETED; progress_ = 1.0f; task::TaskResult result; result.success = true; result.message = "MyTask completed successfully"; result.outputData["processed"] = true; result.outputData["timestamp"] = std::chrono::system_clock::now(); LOG_INFO("[MyTask] Execution completed"); return result; } void MyTask::cancel() { cancelled_ = true; state_ = task::TaskState::CANCELLED; LOG_INFO("[MyTask] Cancellation requested"); } task::TaskState MyTask::getState() const { return state_; } float MyTask::getProgress() const { return progress_; } std::string MyTask::getLastError() const { return ""; } // 导出接口 extern "C" { void registerTasks(task::TaskRegistrar* registrar) { registrar->registerTask("MyTask", []() { return std::make_shared(); }); } const char* getPluginName() { return "MyTaskPlugin"; } const char* getPluginVersion() { return "1.0.0"; } bool pluginInitialize() { LOG_INFO("[MyTaskPlugin] Plugin initialized"); return true; } void pluginCleanup() { LOG_INFO("[MyTaskPlugin] Plugin cleaned up"); } } ``` **CMakeLists.txt (动态库)** ```cmake cmake_minimum_required(VERSION 3.10) project(MyTaskPlugin) set(CMAKE_CXX_STANDARD 17) # 创建动态库 add_library(mytask_plugin SHARED MyTask.cpp) # 链接依赖 target_include_directories(mytask_plugin PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../include ) # 设置输出目录 set_target_properties(mytask_plugin PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../plugins ) ``` ### 使用宏简化Task注册 框架提供了宏来简化Task注册: ```cpp #include "task/TaskRegistrar.h" // 使用宏注册Task DECLARE_PLUGIN("MyPlugin", "1.0.0") BEGIN_TASK_REGISTRATION("MyPlugin") REGISTER_TASK(registrar, "MyTask1", MyTask1) REGISTER_TASK(registrar, "MyTask2", MyTask2) REGISTER_TASK_DEFAULT(registrar, "MyTask3", MyTask3) END_TASK_REGISTRATION() ``` ## 高级功能 ### 事件监听 框架提供了完整的事件监听机制,可以监控流程和任务的执行状态: ```cpp class MyProcessListener : public ProcessEventListener { public: void onProcessStarted(ProcessInstanceId instanceId) override { LOG_INFO("Process started: ", instanceId); } void onProcessCompleted(ProcessInstanceId instanceId) override { LOG_INFO("Process completed: ", instanceId); } void onNodeActivated(ProcessInstanceId instanceId, const std::string& nodeId) override { LOG_DEBUG("Node activated: ", nodeId, " in process ", instanceId); } void onNodeCompleted(ProcessInstanceId instanceId, const std::string& nodeId) override { LOG_DEBUG("Node completed: ", nodeId, " in process ", instanceId); } void onGatewayEvaluated(ProcessInstanceId instanceId, const std::string& gatewayId, const std::vector& selectedPaths) override { LOG_INFO("Gateway evaluated: ", gatewayId, " selected paths: ", selectedPaths.size()); } }; // 注册事件监听器 auto listener = std::make_shared(); processManager.addProcessEventListener(listener.get()); processEngine.addProcessEventListener(listener.get()); ``` ### 并发执行 框架支持多个流程实例和任务的并发执行: ```cpp // 设置最大并发流程数 processEngine.setMaxConcurrentProcesses(10); // 设置更新间隔 processEngine.setUpdateInterval(std::chrono::milliseconds(50)); // 创建多个流程实例 std::vector instances; for (int i = 0; i < 5; ++i) { auto instanceId = processManager.createProcessInstance(defId); instances.push_back(instanceId); ProcessParams params; params.setInt("task_id", i); processManager.startProcess(instanceId, params); } ``` ### 参数传递 支持多种类型的参数传递: ```cpp ProcessParams params; // 字符串参数 params.setString("customerName", "John Doe"); // 数值参数 params.setInt("orderAmount", 1000); params.setDouble("price", 99.99); // 布尔参数 params.setBool("approved", true); // 自定义参数 params.setCustom("customData", std::vector{1, 2, 3}); ``` ## 最佳实践 ### Process设计最佳实践 1. **保持流程简洁**: 每个流程应该专注于一个特定的业务目标 2. **合理使用网关**: - 使用排他网关进行条件分支 - 使用包容网关处理多条件并行 - 使用并行网关实现真正的并行执行 3. **命名规范**: 使用有意义的节点ID和名称 4. **错误处理**: 在关键节点添加错误处理逻辑 ### Task开发最佳实践 1. **单一职责**: 每个Task应该只负责一个具体的功能 2. **状态管理**: 正确管理Task的生命周期状态 3. **错误处理**: 提供清晰的错误信息和错误码 4. **资源管理**: 在initialize和cleanup中正确管理资源 5. **进度报告**: 对于长时间运行的Task,定期报告执行进度 ### 性能优化 1. **Task复用**: 对于无状态的Task,可以复用实例 2. **并发控制**: 合理设置最大并发流程数 3. **内存管理**: 及时清理完成的流程实例 4. **动态库缓存**: 对于频繁使用的Task插件,考虑缓存机制 ## 故障排除 ### 常见问题 1. **Task未找到** - 检查Task名称是否正确 - 确认动态库已正确加载 - 检查导出函数是否正确实现 2. **流程执行卡住** - 检查网关条件是否正确 - 确认所有节点都有正确的连接 - 检查是否有循环依赖 3. **内存泄漏** - 确保Task在cleanup中释放资源 - 及时卸载不再使用的流程定义 - 定期调用ProcessManager的cleanup方法 ### 调试技巧 1. **启用详细日志** ```cpp // 在main函数开始处设置日志级别 Logger::setLevel(LogLevel::DEBUG); ``` 2. **使用事件监听器跟踪执行** ```cpp // 添加详细的事件监听器来跟踪每个步骤 ``` 3. **检查流程状态** ```cpp auto status = processManager.getProcessStatus(instanceId); std::cout << "Process status: " << status.toString() << std::endl; ``` ## API参考 ### ProcessManager 主要方法 - `loadProcessFromXML(const std::string& xmlPath)`: 从XML文件加载流程定义 - `loadProcessFromXMLString(const std::string& xmlContent)`: 从XML字符串加载流程定义 - `createProcessInstance(ProcessDefinitionId defId)`: 创建流程实例 - `startProcess(ProcessInstanceId instanceId, const ProcessParams& params)`: 启动流程 - `getProcessStatus(ProcessInstanceId instanceId)`: 获取流程状态 - `getRunningInstances()`: 获取运行中的流程实例列表 ### TaskManager 主要方法 - `loadLibrary(const std::string& path)`: 加载动态库 - `loadLibrariesFromDirectory(const std::string& dir)`: 从目录加载所有动态库 - `createTask(const std::string& name)`: 创建Task实例 - `getTaskList()`: 获取可用Task列表 - `hasTask(const std::string& name)`: 检查Task是否存在 ### ProcessEngine 主要方法 - `initialize()`: 初始化引擎 - `shutdown()`: 关闭引擎 - `update()`: 更新引擎状态(需要在主循环中调用) - `setMaxConcurrentProcesses(size_t maxProcesses)`: 设置最大并发流程数 - `addProcessEventListener(ProcessEventListener* listener)`: 添加事件监听器 ## 贡献指南 1. **代码规范**: 遵循项目的代码风格 2. **测试**: 为新功能添加相应的测试用例 3. **文档**: 更新相关文档 4. **提交信息**: 使用清晰的提交信息 ## 许可证 本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。 ## 支持 如有问题或建议,请通过以下方式联系: - 创建 Issue - 提交 Pull Request - 发送邮件至项目维护者 --- *最后更新: 2025-10-18*