diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md b/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md index 929be5157f9619a8a3b47e982803f6b21c0bca7a..e1e88a608c9b75e25aa262bbd5267e29148c59b6 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/README.md @@ -30,5 +30,7 @@ solon.cloud.rocketmq.event: consumeThreadNums: 0 #消费线程数,0表示默认 maxReconsumeTimes: 0 #消费消息失败的最大重试次数,0表示默认 producerGroup: "DEFAULT" #生产组 + consumerFilterType: SQL92 # 可以配置 TAG 不配置默认 TAG ,配置了SQL92, 注解中不生效 + consumerFilterExpression: "abc ='123'" # 具体参考文档 https://rocketmq.apache.org/zh/docs/featureBehavior/07messagefilter/ ``` \ No newline at end of file diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java index 34cd507021cb3d5dd6841583b977975193315b61..69e5ebf0f7d7d51d21db7b902d95470ca32508e6 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/MessageUtil.java @@ -48,7 +48,7 @@ class MessageUtil { //@since 3.0 for (Map.Entry kv : event.meta().entrySet()) { - messageBuilder.addProperty("!" + kv.getKey(), kv.getValue()); + messageBuilder.addProperty(kv.getKey(), kv.getValue()); } //设置消息Tag,用于消费端根据指定Tag过滤消息。 diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java index 519672bdc7bf61d883675826460c0e33913805e6..9a287ff1244045d87a6a648fe4d5a07ee1fa6cc1 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConfig.java @@ -34,7 +34,10 @@ public class RocketmqConfig { private static final String PROP_EVENT_consumeThreadNums = "event.consumeThreadNums"; private static final String PROP_EVENT_maxReconsumeTimes = "event.maxReconsumeTimes"; - + // 消费者的消息过滤类型, TAG / SQL92 + private static final String PROP_EVENT_consumerFilterType = "event.consumerFilterType"; + // 消费者的消息过滤表达式 SQL92 + private static final String PROP_EVENT_consumerFilterExpression = "event.consumerFilterExpression"; private String producerGroup; private String consumerGroup; @@ -53,7 +56,10 @@ public class RocketmqConfig { private final int maxReconsumeTimes; private final CloudProps cloudProps; - + // 消费者的消息过滤类型, TAG / SQL92 + private final String consumeFilterType; + // 消费者的消息过滤表达式 SQL92 + private final String consumeFilterExpression; public RocketmqConfig(CloudProps cloudProps) { this.cloudProps = cloudProps; @@ -72,7 +78,8 @@ public class RocketmqConfig { producerGroup = cloudProps.getValue(PROP_EVENT_producerGroup); consumerGroup = cloudProps.getValue(PROP_EVENT_consumerGroup); - + consumeFilterType = cloudProps.getValue(PROP_EVENT_consumerFilterType); + consumeFilterExpression = cloudProps.getValue(PROP_EVENT_consumerFilterExpression); if (Utils.isEmpty(producerGroup)) { producerGroup = "DEFAULT"; } @@ -80,6 +87,13 @@ public class RocketmqConfig { if (Utils.isEmpty(consumerGroup)) { consumerGroup = Solon.cfg().appGroup() + "_" + Solon.cfg().appName(); } + if (Utils.isEmpty(consumeFilterType)) { + consumerGroup = "TAG"; + } + + if (Utils.isEmpty(consumeFilterExpression) && "SQL92".equals(consumeFilterType)) { + throw new IllegalArgumentException("SQL92 filter expression is empty(event.consumerFilterExpression)"); + } log.trace("producerGroup=" + producerGroup); @@ -97,6 +111,14 @@ public class RocketmqConfig { return consumerGroup; } + public String getConsumeFilterType() { + return consumeFilterType; + } + + public String getConsumeFilterExpression() { + return consumeFilterExpression; + } + /** * 产品组 */ diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java index 651f554af055468263cbf67901fab94dbf80b350..63fff51cd2654dd360c2cb8e4818882fa9362903 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumer.java @@ -17,6 +17,7 @@ package org.noear.solon.cloud.extend.rocketmq.impl; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder; import org.noear.solon.Utils; @@ -90,6 +91,8 @@ public class RocketmqConsumer implements Closeable { //账号密码 if (Utils.isNotEmpty(config.getAccessKey())) { builder.setCredentialProvider(new StaticSessionCredentialsProvider(config.getAccessKey(), config.getSecretKey())); + }else { + builder.enableSsl(false); } //发送超时时间,默认3000 单位ms if (config.getTimeout() > 0) { @@ -106,16 +109,23 @@ public class RocketmqConsumer implements Closeable { Collection tags = tagsObserverMap.getTagsByLevel(eventLevel); if (tags.size() > 0) { - String tagsExpr = String.join("||", tags); - //支持 tag 过滤 - if (tags.contains("*")) { - subscriptionExpressions.put(topic, FilterExpression.SUB_ALL); + + if ("SQL92".equals(config.getConsumeFilterType())) { + subscriptionExpressions.put(topic, new FilterExpression(config.getConsumeFilterExpression(), FilterExpressionType.SQL92)); + log.trace("Rocketmq consumer subscribe [" + topic + "(" + config.getConsumeFilterExpression() + ")] ok!"); } else { - subscriptionExpressions.put(topic, new FilterExpression(tagsExpr)); + String tagsExpr = String.join("||", tags); + + //支持 tag 过滤 + if (tags.contains("*")) { + subscriptionExpressions.put(topic, FilterExpression.SUB_ALL); + } else { + subscriptionExpressions.put(topic, new FilterExpression(tagsExpr)); + } + log.trace("Rocketmq5 consumer will subscribe [" + topic + "(" + tagsExpr + ")] ok!"); } - log.trace("Rocketmq5 consumer will subscribe [" + topic + "(" + tagsExpr + ")] ok!"); } } diff --git a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java index 3316a76e65e218e9e0b234306f18ff1425d3b704..712de81cb8f3c20f117a238595431275cb67cedd 100644 --- a/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java +++ b/solon-cloud-event/rocketmq5-solon-cloud-plugin/src/main/java/org/noear/solon/cloud/extend/rocketmq/impl/RocketmqConsumerHandler.java @@ -77,9 +77,7 @@ public class RocketmqConsumerHandler implements MessageListener { //@since 3.0 if (Utils.isNotEmpty(message.getProperties())) { for (Map.Entry kv : message.getProperties().entrySet()) { - if (kv.getKey().startsWith("!")) { - event.meta().put(kv.getKey().substring(1), kv.getValue()); - } + event.meta().put(kv.getKey(), kv.getValue()); } //@since 3.1