From 9db685340ce94b08c8341ed196d7ec0146a2f46b Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Fri, 26 Sep 2025 11:56:24 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(flow):=20=E6=94=AF=E6=8C=81=E8=BF=AD?= =?UTF-8?q?=E4=BB=A3=E5=99=A8=E5=A4=9A-=E7=BA=BF=E7=A8=8B=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=20=E5=BC=95=E5=85=A5=20FlowIterator=20=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E5=8F=8A=E9=BB=98=E8=AE=A4=E5=AE=9E=E7=8E=B0=20FlowIt?= =?UTF-8?q?eratorDefault=20-=20=E9=87=8D=E6=9E=84=20iterator=5Frun=5Fout?= =?UTF-8?q?=20=E6=96=B9=E6=B3=95=E6=94=AF=E6=8C=81=E5=A4=9A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=89=A7=E8=A1=8C-=20=E6=B7=BB=E5=8A=A0=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C=E9=80=BB=E8=BE=91=E4=B8=8E?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=88=86=E6=89=B9=E5=A4=84=E7=90=86=E7=AD=96?= =?UTF-8?q?=E7=95=A5=20-=20=E5=AE=9E=E7=8E=B0=E6=89=A7=E8=A1=8C=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E6=8E=A7=E5=88=B6=E4=B8=8E=E5=BC=82=E5=B8=B8=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=9C=BA=E5=88=B6=20-=20=E4=BC=98=E5=8C=96=E5=B0=8F?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E9=87=8F=E6=97=B6=E7=9A=84=E4=B8=B2=E8=A1=8C?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E9=81=BF=E5=85=8D=E7=BA=BF=E7=A8=8B=E5=BC=80?= =?UTF-8?q?=E9=94=80=20-=20=E5=A2=9E=E5=8A=A0=E8=BF=AD=E4=BB=A3=E5=99=A8?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E4=B8=8E=E7=AE=A1=E7=90=86=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=88=B0=20FlowExchanger=20-=20=E4=BD=BF=E7=94=A8=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E5=AE=89=E5=85=A8=E7=9A=84=E9=9B=86=E5=90=88=E4=B8=8E?= =?UTF-8?q?=E9=94=81=E6=9C=BA=E5=88=B6=E4=BF=9D=E8=AF=81=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=AE=89=E5=85=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../noear/solon/flow/FlowEngineDefault.java | 93 ++++++++++++++++--- .../org/noear/solon/flow/FlowExchanger.java | 9 ++ .../org/noear/solon/flow/FlowIterator.java | 22 +++++ .../noear/solon/flow/FlowIteratorDefault.java | 91 ++++++++++++++++++ .../flow/cfg_iterator/IteratorTest.java | 14 +++ .../cfg_iterator/com/FetchBanduAlbum.java | 2 + .../cfg_iterator/com/FetchBanduCommunity.java | 2 + .../cfg_iterator/com/FetchBanduCourse.java | 2 + 8 files changed, 222 insertions(+), 13 deletions(-) create mode 100644 solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java create mode 100644 solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java index 40f540f..81f8ac7 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java @@ -27,6 +27,8 @@ import org.noear.solon.flow.driver.SimpleFlowDriver; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -585,28 +587,93 @@ public class FlowEngineDefault implements FlowEngine { protected boolean iterator_run_out(FlowDriver driver, FlowExchanger exchanger, Node node, int depth) { String forKey = node.getMeta("$for"); String inKey = node.getMeta("$in"); - Object inObj = exchanger.context().getAs(inKey); + Iterator inIterator = exchanger.getFlowIterator().get(inKey); - Iterator inIterator = null; - if (inObj instanceof Iterator) { - inIterator = (Iterator) inObj; - } else if (inObj instanceof Iterable) { - inIterator = ((Iterable) inObj).iterator(); - } else { + if (inIterator == null) { throw new FlowException(inKey + " is not a Iterable"); } - Stack iterator_stack = exchanger.temporary().stack(node.getChain(), "iterator_run"); iterator_stack.push(inIterator); //::流出 - while (inIterator.hasNext()) { - Object item = inIterator.next(); - exchanger.context().put(forKey, item); - node_run(driver, exchanger, node.getNextNode(), depth); - } + if (exchanger.context().executor() == null ) { + while (inIterator.hasNext()) { + Object item = inIterator.next(); + exchanger.context().put(forKey, item); + node_run(driver, exchanger, node.getNextNode(), depth); + } + } else { + + // 多线程执行 - 预先收集所有元素 + List items = new ArrayList<>(); + while (inIterator.hasNext()) { + items.add(inIterator.next()); + } + + if (!items.isEmpty()) { + // 当任务数量较少时,直接串行执行避免线程池问题 + if (items.size() <= 2) { + for (Object item : items) { + exchanger.context().put(forKey, item); + node_run(driver, exchanger, node.getNextNode(), depth); + } + return true; + } + // 对于大量任务,采用分批执行策略避免线程池饥饿 + AtomicReference errorRef = new AtomicReference<>(); + ExecutorService executor = exchanger.context().executor(); + + // 计算批次大小,避免一次性占用过多线程 + int batchSize = Math.min(4, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); + CountDownLatch batchLatch = new CountDownLatch((int) Math.ceil((double) items.size() / batchSize)); + + for (int i = 0; i < items.size(); i += batchSize) { + final int start = i; + final int end = Math.min(i + batchSize, items.size()); + + executor.execute(() -> { + try { + for (int j = start; j < end; j++) { + if (errorRef.get() != null) { + break; // 如果已经有错误,停止执行 + } + try { + exchanger.context().put(forKey, items.get(j)); + node_run(driver, exchanger, node.getNextNode(), depth); + } catch (Throwable t) { + errorRef.compareAndSet(null, t); + break; + } + } + } finally { + batchLatch.countDown(); + } + }); + } + + try { + long timeout = Math.max(30, (items.size() / batchSize + 1) * 5); + if (!batchLatch.await(timeout, TimeUnit.SECONDS)) { + throw new FlowException("Iterator execution timeout after " + timeout + " seconds"); + } + } catch (InterruptedException e) { + // 取消所有未完成的任务 + Thread.currentThread().interrupt(); + throw new FlowException("Iterator execution interrupted", e); + } + + // 检查是否有异常 + if (errorRef.get() != null) { + if (errorRef.get() instanceof FlowException) { + throw (FlowException) errorRef.get(); + } else { + throw new FlowException(errorRef.get()); + } + } + } + } return true; } } \ No newline at end of file diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java index f79bfa1..cc61159 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java @@ -19,6 +19,8 @@ import org.noear.liquor.eval.Scripts; import org.noear.solon.core.util.Assert; import org.noear.solon.lang.Preview; +import java.util.Iterator; + /** * 流交换器,表示一个流在一次运行时的可交换数据和状态(对内,不支持序列化) * @@ -30,6 +32,8 @@ import org.noear.solon.lang.Preview; public class FlowExchanger { public static final String TAG = "exchanger"; + volatile FlowIterator flowIterator = new FlowIteratorDefault(); + //当前流程上下文 private transient final FlowContext context; //当前流程引擎 @@ -137,4 +141,9 @@ public class FlowExchanger { public void interrupt(boolean interrupted) { this.interrupted = interrupted; } + + public FlowIterator getFlowIterator() { + return flowIterator; + } + } \ No newline at end of file diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java new file mode 100644 index 0000000..5f04238 --- /dev/null +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java @@ -0,0 +1,22 @@ +package org.noear.solon.flow; + +import java.util.Iterator; +import java.util.List; + +public interface FlowIterator { + + Iterator get(String key); + + void put(String key, T... value); + + void put(String key, List value); + + boolean hasNext(String key); + + public Object next(String key); + + public void remove(String key); + + int size(String key); + +} diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java new file mode 100644 index 0000000..288d43d --- /dev/null +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java @@ -0,0 +1,91 @@ +package org.noear.solon.flow; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +public class FlowIteratorDefault implements FlowIterator { + + private transient final Map> iteratorMap = new ConcurrentHashMap<>(); + private final Map lockMap = new ConcurrentHashMap<>(); + + private String key(String key){ + return key; + } + + // 为每个迭代器获取专用锁 + private ReentrantLock getLock(String key) { + return lockMap.computeIfAbsent(key, k -> new ReentrantLock(false)); + } + + @Override + public Iterator get(String key){ + return iteratorMap.get(key(key)).iterator(); + } + + @Override + public boolean hasNext(String key) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + Iterator iterator = get(key(key)); + return iterator != null && iterator.hasNext(); + } finally { + lock.unlock(); + } + } + + @Override + public Object next(String key) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + Iterator iterator = get(key(key)); + if (iterator == null) { + throw new IllegalStateException("Iterator not found for key: " + key); + } + return iterator.next(); + } finally { + lock.unlock(); + } + } + + @Override + public void remove(String key) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + iteratorMap.remove(key(key)); + lockMap.remove(key(key)); // 同时移除锁 + } finally { + lock.unlock(); + } + } + + @Override + public void put(String key, Object... value) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + iteratorMap.put(key(key), Arrays.asList(value)); + } finally { + lock.unlock(); + } + } + + @Override + public void put(String key, List value) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + iteratorMap.put(key(key), value); + } finally { + lock.unlock(); + } + } + + @Override + public int size(String key) { + return iteratorMap.get(key(key)).size(); + } +} diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java b/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java index d49a19e..581ad3d 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java @@ -8,6 +8,7 @@ import org.noear.solon.flow.FlowEngine; import org.noear.solon.test.SolonTest; import java.util.TreeMap; +import java.util.concurrent.Executors; @Slf4j @SolonTest @@ -38,4 +39,17 @@ public class IteratorTest { assert context.incrGet("b") == 3; assert context.incrGet("c") == 1; } + + @Test + public void cas3() { + FlowContext context = FlowContext.of(); + context.executor(Executors.newFixedThreadPool(10)); + + flowEngine.eval("fetch", context); + + context.remove("context"); + log.warn(new TreeMap<>(context.model()).toString()); + + assert 115 == context.model().size(); + } } diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java index c79a5f9..ace4c9b 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java @@ -14,5 +14,7 @@ public class FetchBanduAlbum implements TaskComponent { public void run(FlowContext context, Node node) throws Throwable { String communityId = context.getAs("communityId"); context.put("albumIds", Arrays.asList(communityId + "_b1", communityId + "_b2", communityId + "_b3")); + + context.exchanger().getFlowIterator().put("albumIds", Arrays.asList(communityId + "_b1", communityId + "_b2", communityId + "_b3")); } } diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java index d527179..16dc82a 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java @@ -14,5 +14,7 @@ public class FetchBanduCommunity implements TaskComponent { @Override public void run(FlowContext context, Node node) throws Throwable { context.put("communityIds", Arrays.asList("a1", "a2", "a3")); + + context.exchanger().getFlowIterator().put("communityIds", "a1", "a2", "a3"); } } diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java index d2ba1c4..13e6963 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java @@ -14,5 +14,7 @@ public class FetchBanduCourse implements TaskComponent { public void run(FlowContext context, Node node) throws Throwable { String albumId = context.getAs("albumId"); context.put("courseIds", Arrays.asList(albumId + "_c1", albumId + "_c2", albumId + "_c3")); + + context.exchanger().getFlowIterator().put("courseIds", Arrays.asList(albumId + "_c1", albumId + "_c2", albumId + "_c3")); } } -- Gitee From ea86c01604ea098dc2628a1373523c3844297cd7 Mon Sep 17 00:00:00 2001 From: bai <1145000687@qq.com> Date: Sun, 28 Sep 2025 10:47:55 +0800 Subject: [PATCH 2/2] =?UTF-8?q?refactor(flow):=E4=BC=98=E5=8C=96=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E8=BF=AD=E4=BB=A3=E5=99=A8=E6=89=A7=E8=A1=8C=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 调整空格格式问题以符合代码规范- 移除预先收集元素的多线程执行方式 - 使用流式迭代处理替代批量列表操作- 添加对迭代器空状态的判断支持 - 修改迭代器接口及默认实现以支持消费型回调 -优化线程池任务分批大小计算逻辑 - 增加执行超时时间动态调整机制 --- .../noear/solon/flow/FlowEngineDefault.java | 30 +++++++++---------- .../org/noear/solon/flow/FlowIterator.java | 8 +++-- .../noear/solon/flow/FlowIteratorDefault.java | 14 +++++++-- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java index 81f8ac7..ee03197 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java @@ -597,7 +597,7 @@ public class FlowEngineDefault implements FlowEngine { iterator_stack.push(inIterator); //::流出 - if (exchanger.context().executor() == null ) { + if (exchanger.context().executor() == null) { while (inIterator.hasNext()) { Object item = inIterator.next(); exchanger.context().put(forKey, item); @@ -605,16 +605,12 @@ public class FlowEngineDefault implements FlowEngine { } } else { - // 多线程执行 - 预先收集所有元素 - List items = new ArrayList<>(); - while (inIterator.hasNext()) { - items.add(inIterator.next()); - } - - if (!items.isEmpty()) { + if (!exchanger.getFlowIterator().isEmpty(inKey)) { // 当任务数量较少时,直接串行执行避免线程池问题 - if (items.size() <= 2) { - for (Object item : items) { + int size = exchanger.getFlowIterator().size(inKey); + if (size <= 2) { + while (inIterator.hasNext()) { + Object item = inIterator.next(); exchanger.context().put(forKey, item); node_run(driver, exchanger, node.getNextNode(), depth); } @@ -627,11 +623,11 @@ public class FlowEngineDefault implements FlowEngine { // 计算批次大小,避免一次性占用过多线程 int batchSize = Math.min(4, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); - CountDownLatch batchLatch = new CountDownLatch((int) Math.ceil((double) items.size() / batchSize)); + CountDownLatch batchLatch = new CountDownLatch((int) Math.ceil((double) size / batchSize)); - for (int i = 0; i < items.size(); i += batchSize) { + for (int i = 0; i < size; i += batchSize) { final int start = i; - final int end = Math.min(i + batchSize, items.size()); + final int end = Math.min(i + batchSize, size); executor.execute(() -> { try { @@ -640,8 +636,10 @@ public class FlowEngineDefault implements FlowEngine { break; // 如果已经有错误,停止执行 } try { - exchanger.context().put(forKey, items.get(j)); - node_run(driver, exchanger, node.getNextNode(), depth); + exchanger.getFlowIterator().next(inKey, item -> { + exchanger.context().put(forKey, item); + node_run(driver, exchanger, node.getNextNode(), depth); + }); } catch (Throwable t) { errorRef.compareAndSet(null, t); break; @@ -654,7 +652,7 @@ public class FlowEngineDefault implements FlowEngine { } try { - long timeout = Math.max(30, (items.size() / batchSize + 1) * 5); + long timeout = Math.max(30, (size / batchSize + 1) * 5); if (!batchLatch.await(timeout, TimeUnit.SECONDS)) { throw new FlowException("Iterator execution timeout after " + timeout + " seconds"); } diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java index 5f04238..ae921ab 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java @@ -2,6 +2,8 @@ package org.noear.solon.flow; import java.util.Iterator; import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; public interface FlowIterator { @@ -13,10 +15,12 @@ public interface FlowIterator { boolean hasNext(String key); - public Object next(String key); + void next(String key, Consumer consumer); - public void remove(String key); + void remove(String key); int size(String key); + boolean isEmpty(String key); + } diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java index 288d43d..c3f4972 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java @@ -3,6 +3,7 @@ package org.noear.solon.flow; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; public class FlowIteratorDefault implements FlowIterator { @@ -36,7 +37,7 @@ public class FlowIteratorDefault implements FlowIterator { } @Override - public Object next(String key) { + public void next(String key, Consumer consumer) { ReentrantLock lock = getLock(key); lock.lock(); try { @@ -44,7 +45,9 @@ public class FlowIteratorDefault implements FlowIterator { if (iterator == null) { throw new IllegalStateException("Iterator not found for key: " + key); } - return iterator.next(); + while (iterator.hasNext()){ + consumer.accept(iterator.next()); + } } finally { lock.unlock(); } @@ -88,4 +91,11 @@ public class FlowIteratorDefault implements FlowIterator { public int size(String key) { return iteratorMap.get(key(key)).size(); } + + @Override + public boolean isEmpty(String key) { + return iteratorMap.get(key(key)).isEmpty(); + } + + } -- Gitee