diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java index 40f540f3d55ae5ee3ecd23d2913c1eb26c091d01..ee03197f5527912d9bf8f94e4e17de7aa617d3e2 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowEngineDefault.java @@ -27,6 +27,8 @@ import org.noear.solon.flow.driver.SimpleFlowDriver; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -585,28 +587,91 @@ public class FlowEngineDefault implements FlowEngine { protected boolean iterator_run_out(FlowDriver driver, FlowExchanger exchanger, Node node, int depth) { String forKey = node.getMeta("$for"); String inKey = node.getMeta("$in"); - Object inObj = exchanger.context().getAs(inKey); + Iterator inIterator = exchanger.getFlowIterator().get(inKey); - Iterator inIterator = null; - if (inObj instanceof Iterator) { - inIterator = (Iterator) inObj; - } else if (inObj instanceof Iterable) { - inIterator = ((Iterable) inObj).iterator(); - } else { + if (inIterator == null) { throw new FlowException(inKey + " is not a Iterable"); } - Stack iterator_stack = exchanger.temporary().stack(node.getChain(), "iterator_run"); iterator_stack.push(inIterator); //::流出 - while (inIterator.hasNext()) { - Object item = inIterator.next(); - exchanger.context().put(forKey, item); - node_run(driver, exchanger, node.getNextNode(), depth); - } + if (exchanger.context().executor() == null) { + while (inIterator.hasNext()) { + Object item = inIterator.next(); + exchanger.context().put(forKey, item); + node_run(driver, exchanger, node.getNextNode(), depth); + } + } else { + if (!exchanger.getFlowIterator().isEmpty(inKey)) { + // 当任务数量较少时,直接串行执行避免线程池问题 + int size = exchanger.getFlowIterator().size(inKey); + if (size <= 2) { + while (inIterator.hasNext()) { + Object item = inIterator.next(); + exchanger.context().put(forKey, item); + node_run(driver, exchanger, node.getNextNode(), depth); + } + return true; + } + + // 对于大量任务,采用分批执行策略避免线程池饥饿 + AtomicReference errorRef = new AtomicReference<>(); + ExecutorService executor = exchanger.context().executor(); + + // 计算批次大小,避免一次性占用过多线程 + int batchSize = Math.min(4, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); + CountDownLatch batchLatch = new CountDownLatch((int) Math.ceil((double) size / batchSize)); + + for (int i = 0; i < size; i += batchSize) { + final int start = i; + final int end = Math.min(i + batchSize, size); + + executor.execute(() -> { + try { + for (int j = start; j < end; j++) { + if (errorRef.get() != null) { + break; // 如果已经有错误,停止执行 + } + try { + exchanger.getFlowIterator().next(inKey, item -> { + exchanger.context().put(forKey, item); + node_run(driver, exchanger, node.getNextNode(), depth); + }); + } catch (Throwable t) { + errorRef.compareAndSet(null, t); + break; + } + } + } finally { + batchLatch.countDown(); + } + }); + } + + try { + long timeout = Math.max(30, (size / batchSize + 1) * 5); + if (!batchLatch.await(timeout, TimeUnit.SECONDS)) { + throw new FlowException("Iterator execution timeout after " + timeout + " seconds"); + } + } catch (InterruptedException e) { + // 取消所有未完成的任务 + Thread.currentThread().interrupt(); + throw new FlowException("Iterator execution interrupted", e); + } + + // 检查是否有异常 + if (errorRef.get() != null) { + if (errorRef.get() instanceof FlowException) { + throw (FlowException) errorRef.get(); + } else { + throw new FlowException(errorRef.get()); + } + } + } + } return true; } } \ No newline at end of file diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java index f79bfa1e66e468e8244ba86180fbbec96f4c9e20..cc6115945c3e27edc54d3f934acf1d83a16ddd1b 100644 --- a/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowExchanger.java @@ -19,6 +19,8 @@ import org.noear.liquor.eval.Scripts; import org.noear.solon.core.util.Assert; import org.noear.solon.lang.Preview; +import java.util.Iterator; + /** * 流交换器,表示一个流在一次运行时的可交换数据和状态(对内,不支持序列化) * @@ -30,6 +32,8 @@ import org.noear.solon.lang.Preview; public class FlowExchanger { public static final String TAG = "exchanger"; + volatile FlowIterator flowIterator = new FlowIteratorDefault(); + //当前流程上下文 private transient final FlowContext context; //当前流程引擎 @@ -137,4 +141,9 @@ public class FlowExchanger { public void interrupt(boolean interrupted) { this.interrupted = interrupted; } + + public FlowIterator getFlowIterator() { + return flowIterator; + } + } \ No newline at end of file diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java new file mode 100644 index 0000000000000000000000000000000000000000..ae921ab59c79c009b370148564b44c602c8405e3 --- /dev/null +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowIterator.java @@ -0,0 +1,26 @@ +package org.noear.solon.flow; + +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +public interface FlowIterator { + + Iterator get(String key); + + void put(String key, T... value); + + void put(String key, List value); + + boolean hasNext(String key); + + void next(String key, Consumer consumer); + + void remove(String key); + + int size(String key); + + boolean isEmpty(String key); + +} diff --git a/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java b/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java new file mode 100644 index 0000000000000000000000000000000000000000..c3f4972e3dba9ed786f641a1a434293428d808df --- /dev/null +++ b/solon-flow/src/main/java/org/noear/solon/flow/FlowIteratorDefault.java @@ -0,0 +1,101 @@ +package org.noear.solon.flow; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +public class FlowIteratorDefault implements FlowIterator { + + private transient final Map> iteratorMap = new ConcurrentHashMap<>(); + private final Map lockMap = new ConcurrentHashMap<>(); + + private String key(String key){ + return key; + } + + // 为每个迭代器获取专用锁 + private ReentrantLock getLock(String key) { + return lockMap.computeIfAbsent(key, k -> new ReentrantLock(false)); + } + + @Override + public Iterator get(String key){ + return iteratorMap.get(key(key)).iterator(); + } + + @Override + public boolean hasNext(String key) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + Iterator iterator = get(key(key)); + return iterator != null && iterator.hasNext(); + } finally { + lock.unlock(); + } + } + + @Override + public void next(String key, Consumer consumer) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + Iterator iterator = get(key(key)); + if (iterator == null) { + throw new IllegalStateException("Iterator not found for key: " + key); + } + while (iterator.hasNext()){ + consumer.accept(iterator.next()); + } + } finally { + lock.unlock(); + } + } + + @Override + public void remove(String key) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + iteratorMap.remove(key(key)); + lockMap.remove(key(key)); // 同时移除锁 + } finally { + lock.unlock(); + } + } + + @Override + public void put(String key, Object... value) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + iteratorMap.put(key(key), Arrays.asList(value)); + } finally { + lock.unlock(); + } + } + + @Override + public void put(String key, List value) { + ReentrantLock lock = getLock(key); + lock.lock(); + try { + iteratorMap.put(key(key), value); + } finally { + lock.unlock(); + } + } + + @Override + public int size(String key) { + return iteratorMap.get(key(key)).size(); + } + + @Override + public boolean isEmpty(String key) { + return iteratorMap.get(key(key)).isEmpty(); + } + + +} diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java b/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java index d49a19e37020f72fc5b72a72207ec2f4796a02e2..581ad3d6842bb1925011dcb6cd0b83a8db57486b 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/IteratorTest.java @@ -8,6 +8,7 @@ import org.noear.solon.flow.FlowEngine; import org.noear.solon.test.SolonTest; import java.util.TreeMap; +import java.util.concurrent.Executors; @Slf4j @SolonTest @@ -38,4 +39,17 @@ public class IteratorTest { assert context.incrGet("b") == 3; assert context.incrGet("c") == 1; } + + @Test + public void cas3() { + FlowContext context = FlowContext.of(); + context.executor(Executors.newFixedThreadPool(10)); + + flowEngine.eval("fetch", context); + + context.remove("context"); + log.warn(new TreeMap<>(context.model()).toString()); + + assert 115 == context.model().size(); + } } diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java index c79a5f9e31fdea92772602e40fc15546248d873a..ace4c9b512fdb8bcb8984cd1de04af3713e3470c 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduAlbum.java @@ -14,5 +14,7 @@ public class FetchBanduAlbum implements TaskComponent { public void run(FlowContext context, Node node) throws Throwable { String communityId = context.getAs("communityId"); context.put("albumIds", Arrays.asList(communityId + "_b1", communityId + "_b2", communityId + "_b3")); + + context.exchanger().getFlowIterator().put("albumIds", Arrays.asList(communityId + "_b1", communityId + "_b2", communityId + "_b3")); } } diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java index d5271793b64c40d684d2ac457f1b706cebe2fb24..16dc82a76a6118cf89c33b243a33dd11477e4979 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCommunity.java @@ -14,5 +14,7 @@ public class FetchBanduCommunity implements TaskComponent { @Override public void run(FlowContext context, Node node) throws Throwable { context.put("communityIds", Arrays.asList("a1", "a2", "a3")); + + context.exchanger().getFlowIterator().put("communityIds", "a1", "a2", "a3"); } } diff --git a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java index d2ba1c40c38a15662f0797bc8ab6c23515939d4f..13e696302d8a083eede0cda634ef08ad98d62743 100644 --- a/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java +++ b/solon-flow/src/test/java/features/flow/cfg_iterator/com/FetchBanduCourse.java @@ -14,5 +14,7 @@ public class FetchBanduCourse implements TaskComponent { public void run(FlowContext context, Node node) throws Throwable { String albumId = context.getAs("albumId"); context.put("courseIds", Arrays.asList(albumId + "_c1", albumId + "_c2", albumId + "_c3")); + + context.exchanger().getFlowIterator().put("courseIds", Arrays.asList(albumId + "_c1", albumId + "_c2", albumId + "_c3")); } }