# iot-harbor **Repository Path**: lxrv587/iot-harbor ## Basic Information - **Project Name**: iot-harbor - **Description**: 基于reactor3实现的mqtt库,代码精简。 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 52 - **Forks**: 17 - **Created**: 2019-11-13 - **Last Updated**: 2023-08-02 ## Categories & Tags **Categories**: message-server **Tags**: None ## README # 项目暂停维护 [请使用新项目](https://github.com/quickmsg/smqtt) # MQTT库 [git地址](https://github.com/1ssqq1lxr/iot-harbor) ### 采用技术 > 使用开源reactor-netty库,实现MQTT server。集成了springboot autoconfig实现快速注入容器。 框架采用反应式reactor3库,是代码具有低延迟,高吞吐量等特点。 ### 目前实现功能 - qos 0,1,2完整实现 - 遗嘱消息,保留消息实现 - 客户端重连机制 - 密码校验,以及版本校验 - 支持ssl加密 - spring容器支持 - channel存储,topic存储,保留消息等外部接口支持 - MQTT 协议同时支持WS/TCP 传输,默认MQTT协议打开WS 8443端口 #### 服务端使用说明 ```java RsocketServerSession serverSession=TransportServer.create("192.168.100.237",1884) .auth((s,p)->true) .heart(100000) .protocol(ProtocolType.MQTT) .ssl(false) .auth((username,password)->true) .log(true) .messageHandler(new MemoryMessageHandler()) .exception(throwable -> System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&"+throwable)) .start() .block(); serverSession.closeConnect("device-1").subscribe();// 关闭设备端 List connections= serverSession.getConnections().block(); // 获取所有链接 ``` #### 客户端使用说明 ```java RsocketClientSession clientSession= TransportClient.create("127.0.0.1",1884) .heart(10000) .protocol(ProtocolType.MQTT) // 指定协议 MQTT 包含 TCP/WS 两个端口 默认WS走的8443 WS协议 仅仅启动TCP协议 .ssl(false) // 开发tls加密 .log(true) // 打印报文日志 .onClose(()->{}) // 客户端关闭事件 .clientId("Comsumer_3") // 客户端id .password("12") // 密码 .username("123") // 用户名 .willMessage("123") // 遗嘱消息 .willTopic("/lose") // 遗嘱消息topic .willQos(MqttQoS.AT_LEAST_ONCE) // 遗嘱消息qos .exception(throwable -> System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&"+throwable)) // 异常处理 .messageAcceptor((topic,msg)->{ System.out.println(topic+":"+new String(msg)); }) // 消息接收处理 .connect() .block(); clientSession.sub("test").subscribe(); // 订阅 clientSession.pub("test","Producer_3".getBytes()).subscribe(); // 发布qos0消息 clientSession.pub("test","Producer_1".getBytes(),1).subscribe(); // 发布qos1消息 clientSession.pub("test","Producer_1".getBytes(),true,1).subscribe(); // 发布qos1消息 保留消息 ``` #### 服务端使用说明 ```spring 容器中使用 yaml 配置: iot: mqtt: server: enable: true host: 192.168.100.237 port: 8081 log: false protocol: MQTT heart: 100000 ssl: false 设备校验:实现AuthencationSession接口注入容器即可 异常处理:实现ExceptorAcceptor接口注入容器即可 保留消息处理:实现RsocketMessageHandler接口注入容器即可,默认使用内存。 ``` qq群号: 789331252 ### 关注公众号,输入 `物联网` 扫码加入微信交流群 ![image](image/icon.jpg)