From 255d8f126ddafc0fea8d7fdcf035fae8885a2bee Mon Sep 17 00:00:00 2001 From: surbhi Date: Fri, 10 Oct 2025 14:54:16 -0400 Subject: [PATCH] updating stream defauls like aysnc calls, wait timeout, max idle connection and keep alive counts to ensure the stream doesn't fail. Moving everything to a config file to easy updates --- src/main/java/stream/DataStreamJob.java | 21 ++++++++++++++------ src/main/java/stream/io/BlockElaborator.java | 19 +++++++++++++++++- src/main/java/stream/io/SwapElaborator.java | 5 ----- src/main/java/stream/io/Web3Client.java | 13 ++++++++---- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index cacf582..c6f9ca4 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.ParameterTool; import org.apache.flink.util.Collector; +import stream.config.StreamingDefaults; import stream.dto.*; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import stream.io.PoolTokenIdElaborator; @@ -73,6 +74,11 @@ public class DataStreamJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameters); URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546")); + + // Async operation parameters + int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY); + int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS); + log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds); // Create ObjectMapper for pretty JSON printing ObjectMapper mapper = new ObjectMapper(); @@ -85,15 +91,17 @@ public class DataStreamJob { SingleOutputStreamOperator swapWithTimestampStream = AsyncDataStream.unorderedWait( swapStream, new BlockTimestampElaborator(), - 120, - TimeUnit.SECONDS + asyncTimeoutSeconds, + TimeUnit.SECONDS, + asyncCapacity ); SingleOutputStreamOperator elaboratedSwapStream = AsyncDataStream.unorderedWait( swapWithTimestampStream, new PoolTokenIdElaborator(), - 30, - TimeUnit.SECONDS + asyncTimeoutSeconds, + TimeUnit.SECONDS, + asyncCapacity ); // Extract token addresses and elaborate with metadata @@ -110,8 +118,9 @@ public class DataStreamJob { SingleOutputStreamOperator elaboratedTokens = AsyncDataStream.unorderedWait( tokenAddresses, new TokenElaborator(), - 120, - TimeUnit.SECONDS + asyncTimeoutSeconds, + TimeUnit.SECONDS, + asyncCapacity ); // Connect swap events with token metadata to create SwapEventWithTokenMetadata diff --git a/src/main/java/stream/io/BlockElaborator.java b/src/main/java/stream/io/BlockElaborator.java index f0964d1..44bfe0b 100644 --- a/src/main/java/stream/io/BlockElaborator.java +++ b/src/main/java/stream/io/BlockElaborator.java @@ -16,11 +16,14 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; public class BlockElaborator extends RichAsyncFunction { private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class); private transient Web3j w3; + private static final AtomicInteger activeOperations = new AtomicInteger(0); + private static final AtomicInteger totalOperations = new AtomicInteger(0); @Override public void open(OpenContext openContext) throws Exception { @@ -30,6 +33,13 @@ public class BlockElaborator extends RichAsyncFunction { @Override public void asyncInvoke(BlockId blockId, final ResultFuture resultFuture) { + int active = activeOperations.incrementAndGet(); + int total = totalOperations.incrementAndGet(); + + if (total % 100 == 0) { + log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active); + } + CompletableFuture.supplyAsync(() -> { try { return getBlock(blockId); @@ -37,7 +47,14 @@ public class BlockElaborator extends RichAsyncFunction { log.error("Failed to get block {} on chain {}", blockId, e); throw new RuntimeException("Error processing block " + blockId, e); } - }).thenAccept(result -> resultFuture.complete(Collections.singleton(result))); + }).thenAccept(result -> { + activeOperations.decrementAndGet(); + resultFuture.complete(Collections.singleton(result)); + }).exceptionally(throwable -> { + activeOperations.decrementAndGet(); + resultFuture.completeExceptionally(throwable); + return null; + }); } private EthBlock getBlock(BlockId id) { diff --git a/src/main/java/stream/io/SwapElaborator.java b/src/main/java/stream/io/SwapElaborator.java index 77ca42f..5259195 100644 --- a/src/main/java/stream/io/SwapElaborator.java +++ b/src/main/java/stream/io/SwapElaborator.java @@ -46,11 +46,6 @@ public class SwapElaborator extends RichMapFunction