Add Uniswap v3 swap event streaming

- Added SwapEventLog data stream to DataStreamJob
- Created SwapEventLog DTO for swap event processing
- Added JSON mapping and printing for swap events
This commit is contained in:
2025-09-24 15:25:00 -04:00
parent 21f0f986d2
commit 267ff8baf2
2 changed files with 53 additions and 0 deletions

View File

@@ -76,6 +76,7 @@ public class DataStreamJob {
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events"); DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events"); DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Map the blocks to pretty-printed JSON strings // Map the blocks to pretty-printed JSON strings
/* /*
@@ -130,6 +131,16 @@ public class DataStreamJob {
}) })
.print("Burn Event: "); .print("Burn Event: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: ");
env.execute("Ethereum Block Stream"); env.execute("Ethereum Block Stream");
} }

View File

@@ -0,0 +1,42 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
/*
/// @notice Emitted by the pool for any swaps between token0 and token1
/// @param sender The address that initiated the swap call
/// @param recipient The address that received the output of the swap
/// @param amount0 The delta of the token0 balance of the pool
/// @param amount1 The delta of the token1 balance of the pool
/// @param sqrtPriceX96 The sqrt(price) of the pool after the swap, as a Q64.96
/// @param liquidity The liquidity of the pool after the swap
/// @param tick The log base 1.0001 of price of the pool after the swap
event Swap(
address sender,
address recipient,
int256 amount0,
int256 amount1,
uint160 sqrtPriceX96,
uint128 liquidity,
int24 tick
);
*/
public class SwapEventLog extends EventLog implements Serializable {
public static final String SIGNATURE = "Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)";
public String sender;
public String recipient;
@BigInt
public BigInteger amount0;
@BigInt
public BigInteger amount1;
@BigInt
public BigInteger sqrtPriceX96;
@BigInt
public BigInteger liquidity;
public int tick;
}