# AProcess
**Repository Path**: qq2820/AProcess
## Basic Information
- **Project Name**: AProcess
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-10-17
- **Last Updated**: 2025-10-18
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# C++ Process & Task 管理框架
一个基于XML配置的流程管理和动态Task插件框架,支持可视化流程编排和热插拔任务执行。
## 项目概述
### 核心特性
- **XML流程定义**: 通过XML文件定义业务流程结构和执行逻辑
- **动态Task管理**: 支持运行时动态加载和执行Task插件
- **多种网关支持**: 排他网关、包容网关、并行网关
- **并发执行**: 支持多个流程实例和任务的并行执行
- **事件监听**: 完整的流程和任务执行事件回调机制
- **热插拔**: 支持动态库的热加载和卸载
### 架构设计
```
┌─────────────────────────────────────────┐
│ Application Layer │
│ (使用框架的应用程序) │
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ Process Engine │
│ - 流程定义管理 │
│ - 流程实例管理 │
│ - 流程执行调度 │
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ Process Manager │
│ - XML流程定义解析 │
│ - 流程实例创建 │
│ - 网关条件处理 │
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ Task Manager │
│ - 任务注册表 │
│ - 任务创建工厂 │
│ - 任务生命周期管理 │
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ Dynamic Library Loader │
│ - 动态库加载/卸载 │
│ - 符号解析 │
└─────────────┬───────────────────────────┘
│
┌─────────────▼───────────────────────────┐
│ Task Plugins (*.so/*.dll) │
│ - 任务实现 │
│ - 导出工厂函数 │
└─────────────────────────────────────────┘
```
## 快速开始
### 构建项目
```bash
# 创建构建目录
mkdir build && cd build
# 配置CMake
cmake ..
# 编译
make
# 运行测试
./test_basic
./test_concurrent_tasks
```
### 基本使用
```cpp
#include "process/ProcessManager.h"
#include "process/ProcessEngine.h"
#include "task/TaskManager.h"
int main() {
// 获取管理器实例
auto& processManager = ProcessManager::getInstance();
auto& processEngine = ProcessEngine::getInstance();
auto& taskManager = task::TaskManager::getInstance();
// 初始化引擎
processEngine.initialize();
// 加载任务插件
taskManager.loadLibrariesFromDirectory("./plugins");
// 加载流程定义
auto defId = processManager.loadProcessFromXML("./processes/my-process.xml");
// 创建流程实例
auto instanceId = processManager.createProcessInstance(defId);
// 设置参数并启动流程
ProcessParams params;
params.setInt("amount", 1000);
params.setString("customerName", "John Doe");
processManager.startProcess(instanceId, params);
// 等待流程完成
while (processManager.getProcessStatus(instanceId).state == ProcessState::RUNNING) {
processEngine.update();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 关闭引擎
processEngine.shutdown();
return 0;
}
```
## 设计Process流程
### XML流程定义格式
Process通过XML文件定义,支持以下节点类型:
- **开始节点** (`startEvent`): 流程起点
- **结束节点** (`endEvent`): 流程终点
- **任务节点** (`task`): 执行具体业务逻辑
- **排他网关** (`exclusiveGateway`): 基于条件选择一条路径
- **包容网关** (`inclusiveGateway`): 基于条件选择多条路径
- **并行网关** (`parallelGateway`): 无条件并行执行所有路径
### 基本流程示例
```xml
处理客户订单的业务流程
```
### 网关类型说明
#### 1. 排他网关 (Exclusive Gateway)
- 基于条件选择**一条**路径执行
- 条件表达式格式: `${variable > value}`
- 如果没有满足条件的路径,选择默认路径(无条件路径)
#### 2. 包容网关 (Inclusive Gateway)
- 基于条件选择**多条**路径执行
- 所有满足条件的路径都会并行执行
- 如果没有满足条件的路径,选择默认路径
#### 3. 并行网关 (Parallel Gateway)
- **无条件**地并行执行**所有**出口路径
- 用于实现真正的并行任务执行
### 并行流程示例
```xml
```
## 添加自定义Task
### Task基类接口
所有自定义Task必须继承 `ITask` 基类:
```cpp
class ITask {
public:
virtual ~ITask() = default;
// 基本信息
virtual const std::string& getName() const = 0;
virtual const std::string& getVersion() const = 0;
virtual const std::string& getDescription() const = 0;
// 生命周期管理
virtual bool initialize(const TaskParams& params) = 0;
virtual void cleanup() = 0;
// 任务执行
virtual TaskResult execute(const TaskContext& context) = 0;
virtual void cancel() = 0;
// 状态查询
virtual TaskState getState() const = 0;
virtual float getProgress() const = 0;
virtual std::string getLastError() const = 0;
};
```
### 创建自定义Task
#### 方式1: 直接注册Task
```cpp
#include "task/ITask.h"
#include "task/TaskTypes.h"
#include "Logger.h"
class MyCustomTask : public task::ITask {
private:
std::string name_ = "MyCustomTask";
std::string version_ = "1.0.0";
std::string description_ = "我的自定义任务";
task::TaskState state_ = task::TaskState::IDLE;
float progress_ = 0.0f;
public:
const std::string& getName() const override { return name_; }
const std::string& getVersion() const override { return version_; }
const std::string& getDescription() const override { return description_; }
bool initialize(const task::TaskParams& params) override {
state_ = task::TaskState::IDLE;
LOG_INFO("[MyCustomTask] Initialized");
return true;
}
void cleanup() override {
state_ = task::TaskState::IDLE;
LOG_INFO("[MyCustomTask] Cleaned up");
}
task::TaskResult execute(const task::TaskContext& context) override {
state_ = task::TaskState::RUNNING;
LOG_INFO("[MyCustomTask] Starting execution");
// 模拟任务执行
for (int i = 0; i <= 10; ++i) {
if (context.cancelRequested) {
state_ = task::TaskState::CANCELLED;
return task::TaskResult::failureResult("Task cancelled", task::TaskErrorCode::CANCELLED);
}
progress_ = static_cast(i) / 10.0f;
// 更新进度回调
if (context.progressCallback) {
context.progressCallback(progress_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
state_ = task::TaskState::COMPLETED;
progress_ = 1.0f;
task::TaskResult result;
result.success = true;
result.message = "MyCustomTask completed successfully";
result.outputData["execution_count"] = 1;
result.outputData["custom_data"] = std::string("hello world");
LOG_INFO("[MyCustomTask] Execution completed");
return result;
}
void cancel() override {
state_ = task::TaskState::CANCELLED;
LOG_INFO("[MyCustomTask] Cancellation requested");
}
task::TaskState getState() const override { return state_; }
float getProgress() const override { return progress_; }
std::string getLastError() const override { return ""; }
};
// 注册Task
auto& taskManager = task::TaskManager::getInstance();
taskManager.registerTask("MyCustomTask", []() {
return std::make_shared();
});
```
#### 方式2: 创建动态库Task插件
创建动态库项目,实现Task并导出标准接口:
**MyTask.h**
```cpp
#pragma once
#include "task/ITask.h"
#include "task/TaskTypes.h"
class MyTask : public task::ITask {
public:
const std::string& getName() const override;
const std::string& getVersion() const override;
const std::string& getDescription() const override;
bool initialize(const task::TaskParams& params) override;
task::TaskResult execute(const task::TaskContext& context) override;
void cleanup() override;
void cancel() override;
task::TaskState getState() const override;
float getProgress() const override;
std::string getLastError() const override;
private:
std::string name_ = "MyTask";
std::string version_ = "1.0.0";
std::string description_ = "My custom task from dynamic library";
task::TaskState state_ = task::TaskState::IDLE;
float progress_ = 0.0f;
bool cancelled_ = false;
};
```
**MyTask.cpp**
```cpp
#include "MyTask.h"
#include "task/TaskRegistrar.h"
#include "Logger.h"
const std::string& MyTask::getName() const { return name_; }
const std::string& MyTask::getVersion() const { return version_; }
const std::string& MyTask::getDescription() const { return description_; }
bool MyTask::initialize(const task::TaskParams& params) {
state_ = task::TaskState::IDLE;
cancelled_ = false;
LOG_INFO("[MyTask] Initialized");
return true;
}
void MyTask::cleanup() {
state_ = task::TaskState::IDLE;
LOG_INFO("[MyTask] Cleaned up");
}
task::TaskResult MyTask::execute(const task::TaskContext& context) {
state_ = task::TaskState::RUNNING;
LOG_INFO("[MyTask] Starting execution");
// 处理输入参数
auto it = context.inputParams.stringParams.find("input_data");
if (it != context.inputParams.stringParams.end()) {
LOG_INFO("[MyTask] Input data: ", it->second);
}
// 模拟任务执行
for (int i = 0; i <= 5; ++i) {
if (cancelled_ || context.cancelRequested) {
state_ = task::TaskState::CANCELLED;
return task::TaskResult::failureResult("Task cancelled", task::TaskErrorCode::CANCELLED);
}
progress_ = static_cast(i) / 5.0f;
if (context.progressCallback) {
context.progressCallback(progress_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
state_ = task::TaskState::COMPLETED;
progress_ = 1.0f;
task::TaskResult result;
result.success = true;
result.message = "MyTask completed successfully";
result.outputData["processed"] = true;
result.outputData["timestamp"] = std::chrono::system_clock::now();
LOG_INFO("[MyTask] Execution completed");
return result;
}
void MyTask::cancel() {
cancelled_ = true;
state_ = task::TaskState::CANCELLED;
LOG_INFO("[MyTask] Cancellation requested");
}
task::TaskState MyTask::getState() const { return state_; }
float MyTask::getProgress() const { return progress_; }
std::string MyTask::getLastError() const { return ""; }
// 导出接口
extern "C" {
void registerTasks(task::TaskRegistrar* registrar) {
registrar->registerTask("MyTask", []() {
return std::make_shared();
});
}
const char* getPluginName() {
return "MyTaskPlugin";
}
const char* getPluginVersion() {
return "1.0.0";
}
bool pluginInitialize() {
LOG_INFO("[MyTaskPlugin] Plugin initialized");
return true;
}
void pluginCleanup() {
LOG_INFO("[MyTaskPlugin] Plugin cleaned up");
}
}
```
**CMakeLists.txt (动态库)**
```cmake
cmake_minimum_required(VERSION 3.10)
project(MyTaskPlugin)
set(CMAKE_CXX_STANDARD 17)
# 创建动态库
add_library(mytask_plugin SHARED MyTask.cpp)
# 链接依赖
target_include_directories(mytask_plugin PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/../include
)
# 设置输出目录
set_target_properties(mytask_plugin PROPERTIES
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../plugins
)
```
### 使用宏简化Task注册
框架提供了宏来简化Task注册:
```cpp
#include "task/TaskRegistrar.h"
// 使用宏注册Task
DECLARE_PLUGIN("MyPlugin", "1.0.0")
BEGIN_TASK_REGISTRATION("MyPlugin")
REGISTER_TASK(registrar, "MyTask1", MyTask1)
REGISTER_TASK(registrar, "MyTask2", MyTask2)
REGISTER_TASK_DEFAULT(registrar, "MyTask3", MyTask3)
END_TASK_REGISTRATION()
```
## 高级功能
### 事件监听
框架提供了完整的事件监听机制,可以监控流程和任务的执行状态:
```cpp
class MyProcessListener : public ProcessEventListener {
public:
void onProcessStarted(ProcessInstanceId instanceId) override {
LOG_INFO("Process started: ", instanceId);
}
void onProcessCompleted(ProcessInstanceId instanceId) override {
LOG_INFO("Process completed: ", instanceId);
}
void onNodeActivated(ProcessInstanceId instanceId, const std::string& nodeId) override {
LOG_DEBUG("Node activated: ", nodeId, " in process ", instanceId);
}
void onNodeCompleted(ProcessInstanceId instanceId, const std::string& nodeId) override {
LOG_DEBUG("Node completed: ", nodeId, " in process ", instanceId);
}
void onGatewayEvaluated(ProcessInstanceId instanceId,
const std::string& gatewayId,
const std::vector& selectedPaths) override {
LOG_INFO("Gateway evaluated: ", gatewayId, " selected paths: ", selectedPaths.size());
}
};
// 注册事件监听器
auto listener = std::make_shared();
processManager.addProcessEventListener(listener.get());
processEngine.addProcessEventListener(listener.get());
```
### 并发执行
框架支持多个流程实例和任务的并发执行:
```cpp
// 设置最大并发流程数
processEngine.setMaxConcurrentProcesses(10);
// 设置更新间隔
processEngine.setUpdateInterval(std::chrono::milliseconds(50));
// 创建多个流程实例
std::vector instances;
for (int i = 0; i < 5; ++i) {
auto instanceId = processManager.createProcessInstance(defId);
instances.push_back(instanceId);
ProcessParams params;
params.setInt("task_id", i);
processManager.startProcess(instanceId, params);
}
```
### 参数传递
支持多种类型的参数传递:
```cpp
ProcessParams params;
// 字符串参数
params.setString("customerName", "John Doe");
// 数值参数
params.setInt("orderAmount", 1000);
params.setDouble("price", 99.99);
// 布尔参数
params.setBool("approved", true);
// 自定义参数
params.setCustom("customData", std::vector{1, 2, 3});
```
## 最佳实践
### Process设计最佳实践
1. **保持流程简洁**: 每个流程应该专注于一个特定的业务目标
2. **合理使用网关**:
- 使用排他网关进行条件分支
- 使用包容网关处理多条件并行
- 使用并行网关实现真正的并行执行
3. **命名规范**: 使用有意义的节点ID和名称
4. **错误处理**: 在关键节点添加错误处理逻辑
### Task开发最佳实践
1. **单一职责**: 每个Task应该只负责一个具体的功能
2. **状态管理**: 正确管理Task的生命周期状态
3. **错误处理**: 提供清晰的错误信息和错误码
4. **资源管理**: 在initialize和cleanup中正确管理资源
5. **进度报告**: 对于长时间运行的Task,定期报告执行进度
### 性能优化
1. **Task复用**: 对于无状态的Task,可以复用实例
2. **并发控制**: 合理设置最大并发流程数
3. **内存管理**: 及时清理完成的流程实例
4. **动态库缓存**: 对于频繁使用的Task插件,考虑缓存机制
## 故障排除
### 常见问题
1. **Task未找到**
- 检查Task名称是否正确
- 确认动态库已正确加载
- 检查导出函数是否正确实现
2. **流程执行卡住**
- 检查网关条件是否正确
- 确认所有节点都有正确的连接
- 检查是否有循环依赖
3. **内存泄漏**
- 确保Task在cleanup中释放资源
- 及时卸载不再使用的流程定义
- 定期调用ProcessManager的cleanup方法
### 调试技巧
1. **启用详细日志**
```cpp
// 在main函数开始处设置日志级别
Logger::setLevel(LogLevel::DEBUG);
```
2. **使用事件监听器跟踪执行**
```cpp
// 添加详细的事件监听器来跟踪每个步骤
```
3. **检查流程状态**
```cpp
auto status = processManager.getProcessStatus(instanceId);
std::cout << "Process status: " << status.toString() << std::endl;
```
## API参考
### ProcessManager 主要方法
- `loadProcessFromXML(const std::string& xmlPath)`: 从XML文件加载流程定义
- `loadProcessFromXMLString(const std::string& xmlContent)`: 从XML字符串加载流程定义
- `createProcessInstance(ProcessDefinitionId defId)`: 创建流程实例
- `startProcess(ProcessInstanceId instanceId, const ProcessParams& params)`: 启动流程
- `getProcessStatus(ProcessInstanceId instanceId)`: 获取流程状态
- `getRunningInstances()`: 获取运行中的流程实例列表
### TaskManager 主要方法
- `loadLibrary(const std::string& path)`: 加载动态库
- `loadLibrariesFromDirectory(const std::string& dir)`: 从目录加载所有动态库
- `createTask(const std::string& name)`: 创建Task实例
- `getTaskList()`: 获取可用Task列表
- `hasTask(const std::string& name)`: 检查Task是否存在
### ProcessEngine 主要方法
- `initialize()`: 初始化引擎
- `shutdown()`: 关闭引擎
- `update()`: 更新引擎状态(需要在主循环中调用)
- `setMaxConcurrentProcesses(size_t maxProcesses)`: 设置最大并发流程数
- `addProcessEventListener(ProcessEventListener* listener)`: 添加事件监听器
## 贡献指南
1. **代码规范**: 遵循项目的代码风格
2. **测试**: 为新功能添加相应的测试用例
3. **文档**: 更新相关文档
4. **提交信息**: 使用清晰的提交信息
## 许可证
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
## 支持
如有问题或建议,请通过以下方式联系:
- 创建 Issue
- 提交 Pull Request
- 发送邮件至项目维护者
---
*最后更新: 2025-10-18*