From 2fc85c688bf646c3c616b6f08876d2691a94f62f Mon Sep 17 00:00:00 2001 From: surbhi Date: Wed, 8 Oct 2025 13:16:35 -0400 Subject: [PATCH] Using timestmap from blockhash. Also renaming elaborators to be more descriptive and removing unneeded logs and code --- src/main/java/stream/DataStreamJob.java | 94 ++++++++----------- .../java/stream/dto/ElaboratedSwapEvent.java | 42 --------- .../java/stream/dto/FullyElaboratedSwap.java | 42 --------- src/main/java/stream/io/PoolElaborator.java | 80 ---------------- src/main/java/stream/io/SwapEnricher.java | 94 ------------------- 5 files changed, 37 insertions(+), 315 deletions(-) delete mode 100644 src/main/java/stream/dto/ElaboratedSwapEvent.java delete mode 100644 src/main/java/stream/dto/FullyElaboratedSwap.java delete mode 100644 src/main/java/stream/io/PoolElaborator.java delete mode 100644 src/main/java/stream/io/SwapEnricher.java diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index cf8feb9..9068298 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -29,11 +29,11 @@ import org.apache.flink.util.ParameterTool; import org.apache.flink.util.Collector; import stream.dto.*; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import stream.io.PoolElaborator; +import stream.io.PoolTokenIdElaborator; import stream.io.TokenElaborator; -import stream.io.SwapEnricher; +import stream.io.SwapElaborator; +import stream.io.BlockTimestampElaborator; import stream.source.eventlog.EventLogSourceFactory; -import stream.source.newheads.NewHeadsSourceFactory; import java.util.HashMap; import java.util.Iterator; @@ -71,9 +71,6 @@ public class DataStreamJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameters); - // do not do this until considering how secrets are handled by flink -// env.getConfig().setGlobalJobParameters(parameters); - URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545")); URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546")); // Create ObjectMapper for pretty JSON printing @@ -81,43 +78,28 @@ public class DataStreamJob { mapper.enable(SerializationFeature.INDENT_OUTPUT); - DataStream arbitrumHeads = env - .fromSource( - NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class), - org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), - "ArbitrumOne Head Blocks", - TypeInformation.of(ArbitrumOneBlock.class) - ); - - DataStream mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events"); - DataStream burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events"); DataStream swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); - // Log raw swap events - swapStream.map(swapLog -> { - log.info("RAW SWAP EVENT - Pool: {}, Sender: {}, Recipient: {}, Amount0: {}, Amount1: {}, SqrtPriceX96: {}, Liquidity: {}, Tick: {}, TxHash: {}", - swapLog.address, swapLog.sender, swapLog.recipient, - swapLog.amount0, swapLog.amount1, swapLog.sqrtPriceX96, - swapLog.liquidity, swapLog.tick, swapLog.transactionHash); - return swapLog; - }); - - - - - - SingleOutputStreamOperator elaboratedSwapStream = AsyncDataStream.unorderedWait( + // Add block timestamp to swap events + SingleOutputStreamOperator swapWithTimestampStream = AsyncDataStream.unorderedWait( swapStream, - new PoolElaborator(), + new BlockTimestampElaborator(), + 30, + TimeUnit.SECONDS + ); + + SingleOutputStreamOperator elaboratedSwapStream = AsyncDataStream.unorderedWait( + swapWithTimestampStream, + new PoolTokenIdElaborator(), 30, TimeUnit.SECONDS ); // Extract token addresses and elaborate with metadata DataStream tokenAddresses = elaboratedSwapStream - .flatMap(new FlatMapFunction() { + .flatMap(new FlatMapFunction() { @Override - public void flatMap(ElaboratedSwapEvent event, Collector collector) throws Exception { + public void flatMap(SwapEventWithTokenIds event, Collector collector) throws Exception { collector.collect(new AddressId(42161, event.getToken0Address())); collector.collect(new AddressId(42161, event.getToken1Address())); } @@ -131,15 +113,15 @@ public class DataStreamJob { TimeUnit.SECONDS ); - // Connect swap events with token metadata to create FullyElaboratedSwap - DataStream fullyElaboratedSwaps = elaboratedSwapStream + // Connect swap events with token metadata to create SwapEventWithTokenMetadata + DataStream swapsWithTokens = elaboratedSwapStream .connect(elaboratedTokens) - .flatMap(new RichCoFlatMapFunction() { + .flatMap(new RichCoFlatMapFunction() { private final Map tokenCache = new HashMap<>(); - private final Map pendingSwaps = new HashMap<>(); + private final Map pendingSwaps = new HashMap<>(); @Override - public void flatMap1(ElaboratedSwapEvent event, Collector out) throws Exception { + public void flatMap1(SwapEventWithTokenIds event, Collector out) throws Exception { String token0Addr = event.getToken0Address().toLowerCase(); String token1Addr = event.getToken1Address().toLowerCase(); @@ -147,10 +129,9 @@ public class DataStreamJob { Token token1 = tokenCache.get(token1Addr); if (token0 != null && token1 != null) { - // We have both tokens, create FullyElaboratedSwap - SwapEventLog swapLog = event.getSwapEvent(); - FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1); - out.collect(fullSwap); + // We have both tokens, create SwapEventWithTokenMetadata + SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1); + out.collect(swapWithMetadata); } else { // Cache the swap event until we get both tokens pendingSwaps.put(event.getSwapEvent().transactionHash, event); @@ -158,38 +139,37 @@ public class DataStreamJob { } @Override - public void flatMap2(Token token, Collector out) throws Exception { + public void flatMap2(Token token, Collector out) throws Exception { // Cache the token tokenCache.put(token.address.toLowerCase(), token); - + // Check if any pending swaps can now be completed - Iterator> iterator = pendingSwaps.entrySet().iterator(); + Iterator> iterator = pendingSwaps.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - ElaboratedSwapEvent event = entry.getValue(); - + Map.Entry entry = iterator.next(); + SwapEventWithTokenIds event = entry.getValue(); + String token0Addr = event.getToken0Address().toLowerCase(); String token1Addr = event.getToken1Address().toLowerCase(); - + Token token0 = tokenCache.get(token0Addr); Token token1 = tokenCache.get(token1Addr); - + if (token0 != null && token1 != null) { - SwapEventLog swapLog = event.getSwapEvent(); - FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1); - out.collect(fullSwap); + SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1); + out.collect(swapWithMetadata); iterator.remove(); } } } }); - // Apply SwapEnricher to create final Swap objects with calculated prices - DataStream enrichedSwaps = fullyElaboratedSwaps - .map(new SwapEnricher()); + // Apply SwapElaborator to create final Swap objects with calculated prices + DataStream swaps = swapsWithTokens + .map(new SwapElaborator()); // Print the final enriched swap objects - enrichedSwaps + swaps .map(swap -> { try { String json = mapper.writeValueAsString(swap); diff --git a/src/main/java/stream/dto/ElaboratedSwapEvent.java b/src/main/java/stream/dto/ElaboratedSwapEvent.java deleted file mode 100644 index 1cbf1e5..0000000 --- a/src/main/java/stream/dto/ElaboratedSwapEvent.java +++ /dev/null @@ -1,42 +0,0 @@ -package stream.dto; - -import java.io.Serializable; - -public class ElaboratedSwapEvent implements Serializable { - private SwapEventLog swapEvent; - private String token0Address; - private String token1Address; - - public ElaboratedSwapEvent() { - } - - public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) { - this.swapEvent = swapEvent; - this.token0Address = token0Address; - this.token1Address = token1Address; - } - - public SwapEventLog getSwapEvent() { - return swapEvent; - } - - public void setSwapEvent(SwapEventLog swapEvent) { - this.swapEvent = swapEvent; - } - - public String getToken0Address() { - return token0Address; - } - - public void setToken0Address(String token0Address) { - this.token0Address = token0Address; - } - - public String getToken1Address() { - return token1Address; - } - - public void setToken1Address(String token1Address) { - this.token1Address = token1Address; - } -} \ No newline at end of file diff --git a/src/main/java/stream/dto/FullyElaboratedSwap.java b/src/main/java/stream/dto/FullyElaboratedSwap.java deleted file mode 100644 index 8237abc..0000000 --- a/src/main/java/stream/dto/FullyElaboratedSwap.java +++ /dev/null @@ -1,42 +0,0 @@ -package stream.dto; - -import java.io.Serializable; - -public class FullyElaboratedSwap implements Serializable { - private ElaboratedSwapEvent swapEvent; - private Token token0; - private Token token1; - - public FullyElaboratedSwap() { - } - - public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) { - this.swapEvent = swapEvent; - this.token0 = token0; - this.token1 = token1; - } - - public ElaboratedSwapEvent getSwapEvent() { - return swapEvent; - } - - public void setSwapEvent(ElaboratedSwapEvent swapEvent) { - this.swapEvent = swapEvent; - } - - public Token getToken0() { - return token0; - } - - public void setToken0(Token token0) { - this.token0 = token0; - } - - public Token getToken1() { - return token1; - } - - public void setToken1(Token token1) { - this.token1 = token1; - } -} \ No newline at end of file diff --git a/src/main/java/stream/io/PoolElaborator.java b/src/main/java/stream/io/PoolElaborator.java deleted file mode 100644 index d0e3e88..0000000 --- a/src/main/java/stream/io/PoolElaborator.java +++ /dev/null @@ -1,80 +0,0 @@ -package stream.io; - -import stream.dto.SwapEventLog; -import stream.dto.ElaboratedSwapEvent; -import stream.contract.UniswapV3Pool; - -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; -import org.web3j.protocol.Web3j; -import org.web3j.protocol.http.HttpService; -import org.web3j.crypto.Credentials; -import org.web3j.tx.gas.DefaultGasProvider; -import java.util.concurrent.CompletableFuture; -import java.util.Collections; -import org.apache.flink.streaming.api.functions.async.ResultFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Map; - -public class PoolElaborator extends RichAsyncFunction { - private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class); - private transient Web3j web3j; - private transient Credentials credentials; - private transient DefaultGasProvider gasProvider; - - @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - - // Get RPC URL from job parameters - Map params = getRuntimeContext().getGlobalJobParameters(); - String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545"); - // TODO: Get from configuration if needed - - // Initialize Web3j - this.web3j = Web3j.build(new HttpService(rpcUrl)); - - // Dummy credentials for read-only operations - this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001"); - - // Default gas provider - this.gasProvider = new DefaultGasProvider(); - } - - @Override - public void asyncInvoke(SwapEventLog swap, ResultFuture resultFuture) throws Exception { - CompletableFuture.supplyAsync(() -> { - try { - // Load the pool contract - UniswapV3Pool pool = UniswapV3Pool.load( - swap.getAddress(), // Pool address from the event - web3j, - credentials, - gasProvider - ); - - // Get token addresses - String token0 = pool.token0().send(); - String token1 = pool.token1().send(); - - // Create enriched event - return new ElaboratedSwapEvent(swap, token0, token1); - - } catch (Exception e) { - log.error("Error fetching pool tokens", e); - // Return original without enrichment - return new ElaboratedSwapEvent(swap, null, null); - } - }).thenAccept(enriched -> { - resultFuture.complete(Collections.singletonList(enriched)); - }); - } - - @Override - public void close() throws Exception { - if (web3j != null) { - web3j.shutdown(); - } - } -} diff --git a/src/main/java/stream/io/SwapEnricher.java b/src/main/java/stream/io/SwapEnricher.java deleted file mode 100644 index b2f39d6..0000000 --- a/src/main/java/stream/io/SwapEnricher.java +++ /dev/null @@ -1,94 +0,0 @@ - -package stream.io; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import stream.dto.*; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.math.MathContext; -import java.math.RoundingMode; - -public class SwapEnricher extends RichMapFunction { - private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class); - private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP); - private static final BigDecimal Q96 = new BigDecimal(2).pow(96); - - @Override - public Swap map(FullyElaboratedSwap fullSwap) throws Exception { - ElaboratedSwapEvent event = fullSwap.getSwapEvent(); - SwapEventLog swapLog = event.getSwapEvent(); - Token token0 = fullSwap.getToken0(); - Token token1 = fullSwap.getToken1(); - - // For now, hardcode exchange as UNISWAP_V3 and time as current time - Long time = System.currentTimeMillis(); - Exchange exchange = Exchange.UNISWAP_V3; - - // Pool address - String pool = swapLog.address; - - // Get sqrtPriceX96 and calculate actual price - BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96); - BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT); - BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT); - - // Adjust price for decimals (price is token1/token0) - int decimalDiff = token0.decimals - token1.decimals; - BigDecimal adjustedPrice; - if (decimalDiff >= 0) { - BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff); - adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT); - } else { - BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff); - adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT); - } - - // Determine which token is in and which is out based on amount signs - boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0; - - String takerAsset; - String makerAsset; - BigDecimal amountIn; - BigDecimal amountOut; - BigDecimal finalPrice; - - if (isToken0In) { - // User is sending token0, receiving token1 - takerAsset = event.getToken0Address(); - makerAsset = event.getToken1Address(); - - // Convert amounts to human-readable format using decimals - amountIn = new BigDecimal(swapLog.amount0.abs()) - .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); - amountOut = new BigDecimal(swapLog.amount1) - .divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT); - - // Price is how much token1 you get per token0 - finalPrice = adjustedPrice; - } else { - // User is sending token1, receiving token0 - takerAsset = event.getToken1Address(); - makerAsset = event.getToken0Address(); - - // Convert amounts to human-readable format using decimals - amountIn = new BigDecimal(swapLog.amount1.abs()) - .divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT); - amountOut = new BigDecimal(swapLog.amount0) - .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); - - // Price is how much token0 you get per token1 (inverse of adjustedPrice) - finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT); - } - - log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}", - pool, token0.symbol + "/" + token1.symbol, - isToken0In ? token1.symbol : token0.symbol, - amountIn, amountOut, finalPrice); - - // Pass both amountIn and amountOut to constructor with unit price - return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice); - } -} \ No newline at end of file