# kafka-demo **Repository Path**: createmaker/kafka-demo ## Basic Information - **Project Name**: kafka-demo - **Description**: kafka demo by cursor with zq - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-06 - **Last Updated**: 2025-09-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: Kafka ## README # Kafka 大批量数据处理 Demo 这是一个基于Spring Boot和Kafka的大批量数据处理演示项目,展示了如何高效地生产和消费大量数据。 ## 功能特性 - **批量数据生产**: 支持生成和发送大量用户事件数据 - **批量数据消费**: 支持批量消费和处理Kafka消息 - **异步处理**: 支持异步批量发送,提高吞吐量 - **统计监控**: 提供处理统计信息和事件类型统计 - **REST API**: 提供HTTP接口进行测试和监控 - **性能优化**: 针对大批量数据进行了Kafka配置优化 ## 技术栈 - Java 8 - Spring Boot 2.6.13 - Spring Kafka - Apache Kafka - Jackson (JSON序列化) - Maven ## 项目结构 ``` src/main/java/org/fun/kafkademo/ ├── KafkaDemoApplication.java # 主启动类 ├── config/ │ └── KafkaConfig.java # Kafka配置类 ├── model/ │ ├── UserEvent.java # 用户事件数据模型 │ └── BatchProcessResult.java # 批量处理结果模型 ├── producer/ │ └── BatchProducer.java # 批量生产者 ├── consumer/ │ └── BatchConsumer.java # 批量消费者 └── controller/ └── KafkaBatchController.java # REST控制器 ``` ## 快速开始 ### 1. 环境准备 确保已安装以下环境: - Java 8+ - Maven 3.6+ - Apache Kafka 2.8+ ### 2. 启动Kafka ```bash # 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Kafka bin/kafka-server-start.sh config/server.properties # 创建主题 bin/kafka-topics.sh --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 ``` ### 3. 运行应用 ```bash # 编译项目 mvn clean compile # 运行应用 mvn spring-boot:run ``` 应用将在 `http://localhost:8080` 启动。 ## API 接口 ### 1. 批量发送用户事件 ```bash # 发送1000条用户事件 curl -X POST "http://localhost:8080/api/kafka/send-batch?count=1000" # 发送10000条用户事件 curl -X POST "http://localhost:8080/api/kafka/send-batch?count=10000" ``` ### 2. 异步批量发送 ```bash # 异步发送5000条用户事件 curl -X POST "http://localhost:8080/api/kafka/send-batch-async?count=5000" ``` ### 3. 发送单个事件 ```bash curl -X POST "http://localhost:8080/api/kafka/send-single" \ -H "Content-Type: application/json" \ -d '{ "userId": "user_123", "eventType": "LOGIN", "eventData": "User logged in", "sessionId": "session_456", "timestamp": "2024-01-01 12:00:00", "ipAddress": "192.168.1.100", "userAgent": "Mozilla/5.0..." }' ``` ### 4. 获取统计信息 ```bash # 获取处理统计 curl -X GET "http://localhost:8080/api/kafka/stats" # 重置统计信息 curl -X POST "http://localhost:8080/api/kafka/reset-stats" ``` ### 5. 健康检查 ```bash # 健康检查 curl -X GET "http://localhost:8080/api/kafka/health" # API信息 curl -X GET "http://localhost:8080/api/kafka/info" ``` ## 性能优化配置 ### 生产者优化 - **批量大小**: 16KB (`batch.size=16384`) - **延迟时间**: 5ms (`linger.ms=5`) - **缓冲区**: 32MB (`buffer.memory=33554432`) - **压缩**: Snappy压缩 (`compression.type=snappy`) - **幂等性**: 启用 (`enable.idempotence=true`) ### 消费者优化 - **批量拉取**: 500条记录 (`max.poll.records=500`) - **并发消费**: 3个消费者线程 - **手动提交**: 确保消息处理完成后再提交偏移量 - **批量监听**: 启用批量消息监听 ## 监控和调试 ### 1. 查看Kafka主题 ```bash # 查看主题列表 bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 查看主题详情 bin/kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092 ``` ### 2. 消费消息 ```bash # 从头开始消费消息 bin/kafka-console-consumer.sh --topic user-events --from-beginning --bootstrap-server localhost:9092 # 消费最新消息 bin/kafka-console-consumer.sh --topic user-events --bootstrap-server localhost:9092 ``` ### 3. 查看日志 应用日志会显示详细的处理信息,包括: - 批量发送进度 - 消费处理统计 - 错误和异常信息 - 性能指标 ## 扩展功能 ### 1. 添加新的事件类型 在 `BatchProducer.java` 中的 `EVENT_TYPES` 数组添加新的事件类型。 ### 2. 自定义处理逻辑 在 `BatchConsumer.java` 中的 `processUserEvent` 方法添加自定义的业务处理逻辑。 ### 3. 添加更多监控指标 可以集成Micrometer、Prometheus等监控工具来收集更详细的性能指标。 ## 注意事项 1. **内存使用**: 大批量处理时注意JVM内存配置 2. **网络带宽**: 确保Kafka集群有足够的网络带宽 3. **磁盘空间**: 确保有足够的磁盘空间存储消息 4. **错误处理**: 生产环境中需要完善的错误处理和重试机制 5. **监控告警**: 建议添加监控和告警机制 ## 故障排除 ### 常见问题 1. **连接失败**: 检查Kafka服务是否启动,端口是否正确 2. **序列化错误**: 检查JSON序列化配置和模型类 3. **内存不足**: 调整JVM堆内存大小 4. **消费延迟**: 检查消费者配置和并发设置 ### 日志级别 可以通过修改 `application.properties` 中的日志配置来调整日志级别: ```properties logging.level.org.fun.kafkademo=DEBUG logging.level.org.apache.kafka=WARN ``` ## 许可证 MIT License