# OSS.PipeLine
**Repository Path**: osscore/OSS.PipeLine
## Basic Information
- **Project Name**: OSS.PipeLine
- **Description**: 流式事件处理,微服务下业务生命周期管理,强化业务的流程管理,建立业务操作边界,打造标准化的业务执行单元,提高代码复用。
- **Primary Language**: Unknown
- **License**: GPL-3.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 4
- **Forks**: 3
- **Created**: 2021-06-01
- **Last Updated**: 2026-01-23
## Categories & Tags
**Categories**: process-engine
**Tags**: None
## README
# OSS 流程引擎与数据管道库
## 项目简介
- **OSS.DataPipe**:轻量级消息管道,抽象定义消息输入输出接口,简化业务侧消息生产、消费调用方式。
内部提供基于内存队列的默认实现(可扩展自定义消息队列实现)。
同时提供事件处理器,用以支持自定义事件重试策略。
- **OSS.Pipeline**:轻量级流程引擎,提供Activity、Branch等多种组件、支持自定义流程监控等功能。
组件间的消息传递基于 OSS.DataPipe 实现,进而支持组件的自动重试。
## 快速开始
### 安装
Install-Package OSS.Pipeline
Install-Package OSS.DataPipe
### 简单示例
#### OSS.DataPipe 示例
```csharp
using OSS.DataPipe;
// 注入数据消费方法,并得到数据输入句柄
var dataInputHandler = DataPipeFactory.CreateInput(async (data) =>
{
Console.WriteLine($"处理数据: {data}");
return true; // 返回 true 表示处理成功
});
// 通过数据输入句柄推送数据
await dataInputHandler.Push("测试数据");
// 等待测试数据消费完成
await Task.Delay(1000);
// 如果不再需要继续生产消息,通过此方法释放资源
DataPipeFactory.Release(dataInputHandler);
```
#### OSS.Pipeline 示例
```csharp
using OSS.Pipeline;
// 创建活动组件
var startActivity = new SimpleActivity("Start", async (input) =>
{
Console.WriteLine($"处理输入: {input}");
return new ExecutedResult(true, $"处理后: {input}");
});
// 链式添加后续组件
var endActivity = startActivity.Append("End", async (input) =>
{
Console.WriteLine($"最终处理: {input}");
return new ExecutedResult(true, $"完成: {input}");
});
// 定义流程
var pipeline = PipeLineFactory.Define("MyPipeline", startActivity, endActivity);
// 执行流程
await startActivity.Run("测试数据");
```
## 架构概览
### 内部消息传递
```mermaid
flowchart LR
subgraph Pipeline001
PA[活动/分支组件1]
PB[活动组件2] --> PC[分支组件]
PC
PD[活动/分支组件3]
end
subgraph Pipe1
DA[数据输入] --> DC[消息队列]
DC --> DB[数据输出]
end
subgraph DataPipe2
DD[数据输入] --> DE[消息队列]
DE --> DF[数据输出]
end
PA --> DA
DB --> PB
PC --> DD
DF --> PD
```
### 流程示例图
```mermaid
flowchart TD
A[创建订单] --> B[扣减库存]
B --> C[审核订单]
C --> D{审核结果}
D -->|通过| E[发送邮件通知]
D -->|拒绝| F[取消订单]
subgraph 分支流程
E --> G[更新订单状态]
F --> H[恢复库存]
end
G --> I[流程结束]
H --> I
```
## 详细文档
### OSS.Pipeline
- [快速入门](docs/pipeline/quickstart.md)
- [核心组件](docs/pipeline/core-components.md)
- [高级特性](docs/pipeline/advanced-features.md)
### OSS.DataPipe
- [快速入门](docs/datapipe/quickstart.md)
- [核心组件](docs/datapipe/core-components.md)
- [高级特性](docs/datapipe/advanced-features.md)
## 扩展指南
### 自定义集成 RabbitMQ
OSS.DataPipe 支持自定义数据输入提供器,您可以轻松集成 RabbitMQ 等消息队列。
详细指南请参考:[RabbitMQ 集成指南](docs/extensions/rabbitmq-integration.md)
### 自定义流程监控
您可以实现 `IPipeMonitor` 接口来自定义流程监控逻辑,例如将监控数据发送到 ELK 或 Prometheus。
```csharp
public class MyPipelineMonitor : IPipeMonitor
{
public Task Monitor(MonitorDataItem data)
{
// 自定义监控逻辑
Console.WriteLine($"Pipeline Monitor: {data.Stage} - {data.PipeCode}");
return Task.CompletedTask;
}
}
// 使用自定义监控
pipeline.SetMonitor(new MyPipelineMonitor());
```
## 应用场景
### OSS.Pipeline 应用场景
- 订单处理流程
- 审批流程
- 数据同步流程
- 任务调度系统
- 事件驱动架构
### OSS.DataPipe 应用场景
- 消息队列
- 事件驱动架构
- 可靠任务执行
- 数据流转处理
- 异步处理
## 示例项目
查看 [Tests/OSS.Pipeline.ConsoleTests](https://github.com/your-repo/OSS.Pipeline/tree/main/src/Tests/OSS.Pipeline.ConsoleTests) 目录获取完整示例代码。
## 许可证
MIT License
## 贡献
欢迎提交 Issue 和 Pull Request!