From b7c2b2c5c59f19c00f69d059f4cf07dd9f52d6e1 Mon Sep 17 00:00:00 2001 From: zhangtianpei <619404259@qq.com> Date: Mon, 22 Dec 2025 12:26:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20rocketmq=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20sql92=20=E5=8E=BB=E9=99=A4=C2=A0=20messageUtil=20?= =?UTF-8?q?=E4=B8=AD=20=E7=BB=99=20properties=20=E5=89=8D=E7=BC=80=20!=20R?= =?UTF-8?q?EADME.md=20=E9=85=8D=E7=BD=AE=E6=9B=B4=E6=96=B0=20RocketmqConfi?= =?UTF-8?q?g=E6=96=B0=E5=A2=9E=E9=85=8D=E7=BD=AE=20RocketmqConsumer=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9ESQL92=E8=AE=A2=E9=98=85,=E5=B0=86build,=20?= =?UTF-8?q?=E5=A6=82=E6=9E=9C=E6=B2=A1ssl=E9=85=8D=E7=BD=AE,=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E6=88=90false=20RocketmqConsumerHandler=C2=A0=20?= =?UTF-8?q?=E5=8E=BB=E9=99=A4=E4=BA=86=E6=84=9F=E5=8F=B9=E5=8F=B7=E5=89=8D?= =?UTF-8?q?=E7=BC=80=E5=8C=B9=E9=85=8D,=E5=B0=86=E6=89=80=E6=9C=89?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E9=83=BD=E7=BB=99=E5=88=B0=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq5-solon-cloud-plugin/README.md | 2 ++ .../extend/rocketmq/impl/MessageUtil.java | 2 +- .../extend/rocketmq/impl/RocketmqConfig.java | 28 +++++++++++++++++-- .../rocketmq/impl/RocketmqConsumer.java | 22 +++++++++++---- .../impl/RocketmqConsumerHandler.java | 4 +-- 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md b/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md index 929be515..e1e88a60 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md @@ -30,5 +30,7 @@ solon.cloud.rocketmq.event: consumeThreadNums: 0 #消费线程数,0表示默认 maxReconsumeTimes: 0 #消费消息失败的最大重试次数,0表示默认 producerGroup: "DEFAULT" #生产组 + consumerFilterType: SQL92 # 可以配置 TAG 不配置默认 TAG ,配置了SQL92, 注解中不生效 + consumerFilterExpression: "abc ='123'" # 具体参考文档 https://rocketmq.apache.org/zh/docs/featureBehavior/07messagefilter/ ``` \ No newline at end of file diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java index 34cd5070..69e5ebf0 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java @@ -48,7 +48,7 @@ class MessageUtil { //@since 3.0 for (Map.Entry kv : event.meta().entrySet()) { - messageBuilder.addProperty("!" + kv.getKey(), kv.getValue()); + messageBuilder.addProperty(kv.getKey(), kv.getValue()); } //设置消息Tag,用于消费端根据指定Tag过滤消息。 diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java index 519672bd..9a287ff1 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java @@ -34,7 +34,10 @@ public class RocketmqConfig { private static final String PROP_EVENT_consumeThreadNums = "event.consumeThreadNums"; private static final String PROP_EVENT_maxReconsumeTimes = "event.maxReconsumeTimes"; - + // 消费者的消息过滤类型, TAG / SQL92 + private static final String PROP_EVENT_consumerFilterType = "event.consumerFilterType"; + // 消费者的消息过滤表达式 SQL92 + private static final String PROP_EVENT_consumerFilterExpression = "event.consumerFilterExpression"; private String producerGroup; private String consumerGroup; @@ -53,7 +56,10 @@ public class RocketmqConfig { private final int maxReconsumeTimes; private final CloudProps cloudProps; - + // 消费者的消息过滤类型, TAG / SQL92 + private final String consumeFilterType; + // 消费者的消息过滤表达式 SQL92 + private final String consumeFilterExpression; public RocketmqConfig(CloudProps cloudProps) { this.cloudProps = cloudProps; @@ -72,7 +78,8 @@ public class RocketmqConfig { producerGroup = cloudProps.getValue(PROP_EVENT_producerGroup); consumerGroup = cloudProps.getValue(PROP_EVENT_consumerGroup); - + consumeFilterType = cloudProps.getValue(PROP_EVENT_consumerFilterType); + consumeFilterExpression = cloudProps.getValue(PROP_EVENT_consumerFilterExpression); if (Utils.isEmpty(producerGroup)) { producerGroup = "DEFAULT"; } @@ -80,6 +87,13 @@ public class RocketmqConfig { if (Utils.isEmpty(consumerGroup)) { consumerGroup = Solon.cfg().appGroup() + "_" + Solon.cfg().appName(); } + if (Utils.isEmpty(consumeFilterType)) { + consumerGroup = "TAG"; + } + + if (Utils.isEmpty(consumeFilterExpression) && "SQL92".equals(consumeFilterType)) { + throw new IllegalArgumentException("SQL92 filter expression is empty(event.consumerFilterExpression)"); + } log.trace("producerGroup=" + producerGroup); @@ -97,6 +111,14 @@ public class RocketmqConfig { return consumerGroup; } + public String getConsumeFilterType() { + return consumeFilterType; + } + + public String getConsumeFilterExpression() { + return consumeFilterExpression; + } + /** * 产品组 */ diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java index 651f554a..63fff51c 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java @@ -17,6 +17,7 @@ package org.noear.solon.cloud.extend.rocketmq.impl; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder; import org.noear.solon.Utils; @@ -90,6 +91,8 @@ public class RocketmqConsumer implements Closeable { //账号密码 if (Utils.isNotEmpty(config.getAccessKey())) { builder.setCredentialProvider(new StaticSessionCredentialsProvider(config.getAccessKey(), config.getSecretKey())); + }else { + builder.enableSsl(false); } //发送超时时间,默认3000 单位ms if (config.getTimeout() > 0) { @@ -106,16 +109,23 @@ public class RocketmqConsumer implements Closeable { Collection tags = tagsObserverMap.getTagsByLevel(eventLevel); if (tags.size() > 0) { - String tagsExpr = String.join("||", tags); - //支持 tag 过滤 - if (tags.contains("*")) { - subscriptionExpressions.put(topic, FilterExpression.SUB_ALL); + + if ("SQL92".equals(config.getConsumeFilterType())) { + subscriptionExpressions.put(topic, new FilterExpression(config.getConsumeFilterExpression(), FilterExpressionType.SQL92)); + log.trace("Rocketmq consumer subscribe [" + topic + "(" + config.getConsumeFilterExpression() + ")] ok!"); } else { - subscriptionExpressions.put(topic, new FilterExpression(tagsExpr)); + String tagsExpr = String.join("||", tags); + + //支持 tag 过滤 + if (tags.contains("*")) { + subscriptionExpressions.put(topic, FilterExpression.SUB_ALL); + } else { + subscriptionExpressions.put(topic, new FilterExpression(tagsExpr)); + } + log.trace("Rocketmq5 consumer will subscribe [" + topic + "(" + tagsExpr + ")] ok!"); } - log.trace("Rocketmq5 consumer will subscribe [" + topic + "(" + tagsExpr + ")] ok!"); } } diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java index 3316a76e..712de81c 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java @@ -77,9 +77,7 @@ public class RocketmqConsumerHandler implements MessageListener { //@since 3.0 if (Utils.isNotEmpty(message.getProperties())) { for (Map.Entry kv : message.getProperties().entrySet()) { - if (kv.getKey().startsWith("!")) { - event.meta().put(kv.getKey().substring(1), kv.getValue()); - } + event.meta().put(kv.getKey(), kv.getValue()); } //@since 3.1 -- Gitee