diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index eb690c5..cf8feb9 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -24,15 +24,21 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.util.ParameterTool; import org.apache.flink.util.Collector; import stream.dto.*; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import stream.io.PoolElaborator; import stream.io.TokenElaborator; +import stream.io.SwapEnricher; import stream.source.eventlog.EventLogSourceFactory; import stream.source.newheads.NewHeadsSourceFactory; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -87,6 +93,18 @@ public class DataStreamJob { DataStream burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events"); DataStream swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); + // Log raw swap events + swapStream.map(swapLog -> { + log.info("RAW SWAP EVENT - Pool: {}, Sender: {}, Recipient: {}, Amount0: {}, Amount1: {}, SqrtPriceX96: {}, Liquidity: {}, Tick: {}, TxHash: {}", + swapLog.address, swapLog.sender, swapLog.recipient, + swapLog.amount0, swapLog.amount1, swapLog.sqrtPriceX96, + swapLog.liquidity, swapLog.tick, swapLog.transactionHash); + return swapLog; + }); + + + + SingleOutputStreamOperator elaboratedSwapStream = AsyncDataStream.unorderedWait( swapStream, @@ -113,35 +131,74 @@ public class DataStreamJob { TimeUnit.SECONDS ); - // Print comprehensive swap event data with token metadata - elaboratedSwapStream - .map(event -> { + // Connect swap events with token metadata to create FullyElaboratedSwap + DataStream fullyElaboratedSwaps = elaboratedSwapStream + .connect(elaboratedTokens) + .flatMap(new RichCoFlatMapFunction() { + private final Map tokenCache = new HashMap<>(); + private final Map pendingSwaps = new HashMap<>(); + + @Override + public void flatMap1(ElaboratedSwapEvent event, Collector out) throws Exception { + String token0Addr = event.getToken0Address().toLowerCase(); + String token1Addr = event.getToken1Address().toLowerCase(); + + Token token0 = tokenCache.get(token0Addr); + Token token1 = tokenCache.get(token1Addr); + + if (token0 != null && token1 != null) { + // We have both tokens, create FullyElaboratedSwap + SwapEventLog swapLog = event.getSwapEvent(); + FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1); + out.collect(fullSwap); + } else { + // Cache the swap event until we get both tokens + pendingSwaps.put(event.getSwapEvent().transactionHash, event); + } + } + + @Override + public void flatMap2(Token token, Collector out) throws Exception { + // Cache the token + tokenCache.put(token.address.toLowerCase(), token); + + // Check if any pending swaps can now be completed + Iterator> iterator = pendingSwaps.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + ElaboratedSwapEvent event = entry.getValue(); + + String token0Addr = event.getToken0Address().toLowerCase(); + String token1Addr = event.getToken1Address().toLowerCase(); + + Token token0 = tokenCache.get(token0Addr); + Token token1 = tokenCache.get(token1Addr); + + if (token0 != null && token1 != null) { + SwapEventLog swapLog = event.getSwapEvent(); + FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1); + out.collect(fullSwap); + iterator.remove(); + } + } + } + }); + + // Apply SwapEnricher to create final Swap objects with calculated prices + DataStream enrichedSwaps = fullyElaboratedSwaps + .map(new SwapEnricher()); + + // Print the final enriched swap objects + enrichedSwaps + .map(swap -> { try { - String json = mapper.writeValueAsString(event); - log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}", - event.getSwapEvent().address, - event.getSwapEvent().blockNumber, - event.getSwapEvent().transactionHash, - event.getToken0Address(), - event.getToken1Address(), - event.getSwapEvent().amount0, - event.getSwapEvent().amount1, - event.getSwapEvent().tick); + String json = mapper.writeValueAsString(swap); return json; } catch (Exception e) { - return "Error converting elaborated swap event to JSON: " + e.getMessage(); + return "Error converting enriched swap to JSON: " + e.getMessage(); } }) - .print("Swap Event: "); - - // Print token metadata when available - elaboratedTokens - .map(token -> { - log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}", - token.address, token.name, token.symbol, token.decimals); - return token.toString(); - }) - .print("Token: "); + .print("Enriched Swap: "); env.execute("Ethereum Block Stream"); } diff --git a/src/main/java/stream/dto/ElaboratedEventType.java b/src/main/java/stream/dto/ElaboratedEventType.java index 0d20e64..adb41b5 100644 --- a/src/main/java/stream/dto/ElaboratedEventType.java +++ b/src/main/java/stream/dto/ElaboratedEventType.java @@ -7,6 +7,8 @@ import org.web3j.abi.datatypes.Bool; import org.web3j.abi.datatypes.Type; import org.web3j.abi.datatypes.Utf8String; import org.web3j.abi.datatypes.generated.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serial; import java.io.Serializable; @@ -20,6 +22,8 @@ import static stream.io.EthUtils.keccak; @SuppressWarnings("rawtypes") public class ElaboratedEventType extends EventType { + + private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class); public transient String name; public transient List paramNames; @@ -63,7 +67,9 @@ public class ElaboratedEventType extends EventType { int dataValuesIndex = 0; int topicsIndex = 1; // the first topic is the event signature List args = new ArrayList<>(); - for (TypeReference paramType : this.paramTypes) { + for (int i = 0; i < this.paramTypes.size(); i++) { + TypeReference paramType = this.paramTypes.get(i); + String paramName = this.paramNames.get(i); Object value; if (paramType.isIndexed()) { String encoded = topics.get(topicsIndex++); diff --git a/src/main/java/stream/dto/Swap.java b/src/main/java/stream/dto/Swap.java index 1d88358..890a80d 100644 --- a/src/main/java/stream/dto/Swap.java +++ b/src/main/java/stream/dto/Swap.java @@ -8,23 +8,35 @@ public class Swap extends ChainId { @Serial private static final long serialVersionUID = 1L; - Long time; // milliseconds - Exchange exchange; - String pool; // address - String takerAsset; // token address - String makerAsset; // token address - BigDecimal amount; // positive means the taker bought; negative means the taker sold. - BigDecimal price; + public Long time; // milliseconds + public Exchange exchange; + public String pool; // address + public String takerAsset; // token address + public String makerAsset; // token address + public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold. + public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold. + public BigDecimal price; public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset, - BigDecimal amount, BigDecimal price) { + BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) { super(chainId); this.time = time; this.exchange = exchange; this.pool = pool; this.takerAsset = takerAsset; this.makerAsset = makerAsset; - this.amount = amount; - this.price = price; + this.amountIn = amountIn; + this.amountOut = amountOut; + this.price = price; // Total trade value } + + // Getter methods + public Long getTime() { return time; } + public Exchange getExchange() { return exchange; } + public String getPool() { return pool; } + public String getTakerAsset() { return takerAsset; } + public String getMakerAsset() { return makerAsset; } + public BigDecimal getAmountIn() { return amountIn; } + public BigDecimal getAmountOut() { return amountOut; } + public BigDecimal getPrice() { return price; } } diff --git a/src/main/java/stream/dto/SwapEventLog.java b/src/main/java/stream/dto/SwapEventLog.java index 134cd47..c539f1a 100644 --- a/src/main/java/stream/dto/SwapEventLog.java +++ b/src/main/java/stream/dto/SwapEventLog.java @@ -29,13 +29,9 @@ public class SwapEventLog extends EventLog implements Serializable { public String sender; public String recipient; - @BigInt public BigInteger amount0; - @BigInt public BigInteger amount1; - @BigInt public BigInteger sqrtPriceX96; - @BigInt public BigInteger liquidity; public int tick;