updating stream defauls like aysnc calls, wait timeout, max idle connection and keep alive counts to ensure the stream doesn't fail. Moving everything to a config file to easy updates
This commit is contained in:
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
|
||||
import org.apache.flink.util.ParameterTool;
|
||||
import org.apache.flink.util.Collector;
|
||||
import stream.config.StreamingDefaults;
|
||||
import stream.dto.*;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import stream.io.PoolTokenIdElaborator;
|
||||
@@ -73,6 +74,11 @@ public class DataStreamJob {
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
env.getConfig().setGlobalJobParameters(parameters);
|
||||
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
|
||||
|
||||
// Async operation parameters
|
||||
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
|
||||
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
|
||||
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
|
||||
|
||||
// Create ObjectMapper for pretty JSON printing
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
@@ -85,15 +91,17 @@ public class DataStreamJob {
|
||||
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
|
||||
swapStream,
|
||||
new BlockTimestampElaborator(),
|
||||
120,
|
||||
TimeUnit.SECONDS
|
||||
asyncTimeoutSeconds,
|
||||
TimeUnit.SECONDS,
|
||||
asyncCapacity
|
||||
);
|
||||
|
||||
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
swapWithTimestampStream,
|
||||
new PoolTokenIdElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
asyncTimeoutSeconds,
|
||||
TimeUnit.SECONDS,
|
||||
asyncCapacity
|
||||
);
|
||||
|
||||
// Extract token addresses and elaborate with metadata
|
||||
@@ -110,8 +118,9 @@ public class DataStreamJob {
|
||||
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
|
||||
tokenAddresses,
|
||||
new TokenElaborator(),
|
||||
120,
|
||||
TimeUnit.SECONDS
|
||||
asyncTimeoutSeconds,
|
||||
TimeUnit.SECONDS,
|
||||
asyncCapacity
|
||||
);
|
||||
|
||||
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
|
||||
|
||||
@@ -16,11 +16,14 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
|
||||
private transient Web3j w3;
|
||||
private static final AtomicInteger activeOperations = new AtomicInteger(0);
|
||||
private static final AtomicInteger totalOperations = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
@@ -30,6 +33,13 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
|
||||
int active = activeOperations.incrementAndGet();
|
||||
int total = totalOperations.incrementAndGet();
|
||||
|
||||
if (total % 100 == 0) {
|
||||
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
|
||||
}
|
||||
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return getBlock(blockId);
|
||||
@@ -37,7 +47,14 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
log.error("Failed to get block {} on chain {}", blockId, e);
|
||||
throw new RuntimeException("Error processing block " + blockId, e);
|
||||
}
|
||||
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
|
||||
}).thenAccept(result -> {
|
||||
activeOperations.decrementAndGet();
|
||||
resultFuture.complete(Collections.singleton(result));
|
||||
}).exceptionally(throwable -> {
|
||||
activeOperations.decrementAndGet();
|
||||
resultFuture.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private EthBlock getBlock(BlockId id) {
|
||||
|
||||
@@ -46,11 +46,6 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
||||
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||
}
|
||||
|
||||
// Check if this is a WETH/USDC swap for debugging
|
||||
boolean isWethUsdcPool = false;
|
||||
String token0Lower = event.getToken0Address().toLowerCase();
|
||||
String token1Lower = event.getToken1Address().toLowerCase();
|
||||
|
||||
// Determine which token is in and which is out based on amount signs
|
||||
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||
String takerAsset;
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.http.HttpService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import stream.config.StreamingDefaults;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
@@ -25,15 +26,19 @@ public class Web3Client {
|
||||
synchronized (w3Lock) {
|
||||
log.info("Initializing Web3 client");
|
||||
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
|
||||
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
|
||||
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
|
||||
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
|
||||
Duration timeout = Duration.ofSeconds(30);
|
||||
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||
|
||||
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
|
||||
|
||||
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||
url, maxIdleConnections, keepAlive, timeout.getSeconds());
|
||||
|
||||
var httpClient = new OkHttpClient.Builder()
|
||||
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
|
||||
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
|
||||
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
|
||||
.connectionPool(connectionPool)
|
||||
.build();
|
||||
w3 = Web3j.build(new HttpService(url, httpClient));
|
||||
log.info("Web3 client initialized successfully");
|
||||
|
||||
Reference in New Issue
Block a user