From 267ff8baf20453910424e57937cc48d484eefe63 Mon Sep 17 00:00:00 2001 From: Surbhi Jhavar Date: Wed, 24 Sep 2025 15:25:00 -0400 Subject: [PATCH] Add Uniswap v3 swap event streaming - Added SwapEventLog data stream to DataStreamJob - Created SwapEventLog DTO for swap event processing - Added JSON mapping and printing for swap events --- src/main/java/stream/DataStreamJob.java | 11 ++++++ src/main/java/stream/dto/SwapEventLog.java | 42 ++++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 src/main/java/stream/dto/SwapEventLog.java diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index 7b5767d..2a1f08b 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -76,6 +76,7 @@ public class DataStreamJob { DataStream mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events"); DataStream burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events"); + DataStream swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); // Map the blocks to pretty-printed JSON strings /* @@ -130,6 +131,16 @@ public class DataStreamJob { }) .print("Burn Event: "); + swapStream + .map(event -> { + try { + return mapper.writeValueAsString(event); + } catch (Exception e) { + return "Error converting swap event to JSON: " + e.getMessage(); + } + }) + .print("Swap Event: "); + env.execute("Ethereum Block Stream"); } diff --git a/src/main/java/stream/dto/SwapEventLog.java b/src/main/java/stream/dto/SwapEventLog.java new file mode 100644 index 0000000..686b032 --- /dev/null +++ b/src/main/java/stream/dto/SwapEventLog.java @@ -0,0 +1,42 @@ +package stream.dto; + +import java.io.Serializable; +import java.math.BigInteger; + + +/* +/// @notice Emitted by the pool for any swaps between token0 and token1 +/// @param sender The address that initiated the swap call +/// @param recipient The address that received the output of the swap +/// @param amount0 The delta of the token0 balance of the pool +/// @param amount1 The delta of the token1 balance of the pool +/// @param sqrtPriceX96 The sqrt(price) of the pool after the swap, as a Q64.96 +/// @param liquidity The liquidity of the pool after the swap +/// @param tick The log base 1.0001 of price of the pool after the swap +event Swap( + address sender, + address recipient, + int256 amount0, + int256 amount1, + uint160 sqrtPriceX96, + uint128 liquidity, + int24 tick +); +*/ + +public class SwapEventLog extends EventLog implements Serializable { + public static final String SIGNATURE = "Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)"; + + public String sender; + public String recipient; + @BigInt + public BigInteger amount0; + @BigInt + public BigInteger amount1; + @BigInt + public BigInteger sqrtPriceX96; + @BigInt + public BigInteger liquidity; + public int tick; + +} \ No newline at end of file