# OSS.PipeLine **Repository Path**: osscore/OSS.PipeLine ## Basic Information - **Project Name**: OSS.PipeLine - **Description**: 流式事件处理,微服务下业务生命周期管理,强化业务的流程管理,建立业务操作边界,打造标准化的业务执行单元,提高代码复用。 - **Primary Language**: Unknown - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 3 - **Created**: 2021-06-01 - **Last Updated**: 2026-01-23 ## Categories & Tags **Categories**: process-engine **Tags**: None ## README # OSS 流程引擎与数据管道库 ## 项目简介 - **OSS.DataPipe**:轻量级消息管道,抽象定义消息输入输出接口,简化业务侧消息生产、消费调用方式。
内部提供基于内存队列的默认实现(可扩展自定义消息队列实现)。
同时提供事件处理器,用以支持自定义事件重试策略。 - **OSS.Pipeline**:轻量级流程引擎,提供Activity、Branch等多种组件、支持自定义流程监控等功能。
组件间的消息传递基于 OSS.DataPipe 实现,进而支持组件的自动重试。 ## 快速开始 ### 安装 Install-Package OSS.Pipeline Install-Package OSS.DataPipe ### 简单示例 #### OSS.DataPipe 示例 ```csharp using OSS.DataPipe; // 注入数据消费方法,并得到数据输入句柄 var dataInputHandler = DataPipeFactory.CreateInput(async (data) => { Console.WriteLine($"处理数据: {data}"); return true; // 返回 true 表示处理成功 }); // 通过数据输入句柄推送数据 await dataInputHandler.Push("测试数据"); // 等待测试数据消费完成 await Task.Delay(1000); // 如果不再需要继续生产消息,通过此方法释放资源 DataPipeFactory.Release(dataInputHandler); ``` #### OSS.Pipeline 示例 ```csharp using OSS.Pipeline; // 创建活动组件 var startActivity = new SimpleActivity("Start", async (input) => { Console.WriteLine($"处理输入: {input}"); return new ExecutedResult(true, $"处理后: {input}"); }); // 链式添加后续组件 var endActivity = startActivity.Append("End", async (input) => { Console.WriteLine($"最终处理: {input}"); return new ExecutedResult(true, $"完成: {input}"); }); // 定义流程 var pipeline = PipeLineFactory.Define("MyPipeline", startActivity, endActivity); // 执行流程 await startActivity.Run("测试数据"); ``` ## 架构概览 ### 内部消息传递 ```mermaid flowchart LR subgraph Pipeline001 PA[活动/分支组件1] PB[活动组件2] --> PC[分支组件] PC PD[活动/分支组件3] end subgraph Pipe1 DA[数据输入] --> DC[消息队列] DC --> DB[数据输出] end subgraph DataPipe2 DD[数据输入] --> DE[消息队列] DE --> DF[数据输出] end PA --> DA DB --> PB PC --> DD DF --> PD ``` ### 流程示例图 ```mermaid flowchart TD A[创建订单] --> B[扣减库存] B --> C[审核订单] C --> D{审核结果} D -->|通过| E[发送邮件通知] D -->|拒绝| F[取消订单] subgraph 分支流程 E --> G[更新订单状态] F --> H[恢复库存] end G --> I[流程结束] H --> I ``` ## 详细文档 ### OSS.Pipeline - [快速入门](docs/pipeline/quickstart.md) - [核心组件](docs/pipeline/core-components.md) - [高级特性](docs/pipeline/advanced-features.md) ### OSS.DataPipe - [快速入门](docs/datapipe/quickstart.md) - [核心组件](docs/datapipe/core-components.md) - [高级特性](docs/datapipe/advanced-features.md) ## 扩展指南 ### 自定义集成 RabbitMQ OSS.DataPipe 支持自定义数据输入提供器,您可以轻松集成 RabbitMQ 等消息队列。 详细指南请参考:[RabbitMQ 集成指南](docs/extensions/rabbitmq-integration.md) ### 自定义流程监控 您可以实现 `IPipeMonitor` 接口来自定义流程监控逻辑,例如将监控数据发送到 ELK 或 Prometheus。 ```csharp public class MyPipelineMonitor : IPipeMonitor { public Task Monitor(MonitorDataItem data) { // 自定义监控逻辑 Console.WriteLine($"Pipeline Monitor: {data.Stage} - {data.PipeCode}"); return Task.CompletedTask; } } // 使用自定义监控 pipeline.SetMonitor(new MyPipelineMonitor()); ``` ## 应用场景 ### OSS.Pipeline 应用场景 - 订单处理流程 - 审批流程 - 数据同步流程 - 任务调度系统 - 事件驱动架构 ### OSS.DataPipe 应用场景 - 消息队列 - 事件驱动架构 - 可靠任务执行 - 数据流转处理 - 异步处理 ## 示例项目 查看 [Tests/OSS.Pipeline.ConsoleTests](https://github.com/your-repo/OSS.Pipeline/tree/main/src/Tests/OSS.Pipeline.ConsoleTests) 目录获取完整示例代码。 ## 许可证 MIT License ## 贡献 欢迎提交 Issue 和 Pull Request!