From e58fdc0b06ba50f9b886c0c8f073d581e19ae2ad Mon Sep 17 00:00:00 2001 From: sjhavar Date: Mon, 6 Oct 2025 13:19:02 -0400 Subject: [PATCH] Add PoolElaborator for token address enrichment - Created PoolElaborator async function to retrieve token0 and token1 addresses from Uniswap v3 pools - Added ElaboratedSwapEvent DTO to store swap events with token addresses - Updated SwapEventLog with getAddress() method for pool contract access - Integrated PoolElaborator into DataStreamJob pipeline for real-time token enrichment - Configured dynamic RPC URL retrieval from job parameters --- src/main/java/stream/DataStreamJob.java | 78 +++++++++++------- .../java/stream/dto/ElaboratedSwapEvent.java | 42 ++++++++++ src/main/java/stream/dto/SwapEventLog.java | 3 + src/main/java/stream/io/PoolElaborator.java | 80 +++++++++++++++++++ 4 files changed, 176 insertions(+), 27 deletions(-) create mode 100644 src/main/java/stream/dto/ElaboratedSwapEvent.java create mode 100644 src/main/java/stream/io/PoolElaborator.java diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index 9a2b4d7..1182307 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -27,6 +27,7 @@ import org.apache.flink.util.ParameterTool; import stream.contract.ArbitrumOne; import stream.dto.*; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import stream.io.PoolElaborator; import stream.io.TokenElaborator; import stream.source.eventlog.EventLogSourceFactory; import stream.source.newheads.NewHeadsSourceFactory; @@ -94,6 +95,29 @@ public class DataStreamJob { DataStream burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events"); DataStream swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); + + SingleOutputStreamOperator elaboratedSwapStream = AsyncDataStream.unorderedWait( + swapStream, + new PoolElaborator(), + 30, + TimeUnit.SECONDS + ); + + // Print the elaborated swap events with token addresses + elaboratedSwapStream + .map(event -> { + try { + String json = mapper.writeValueAsString(event); + log.info("Elaborated Swap Event - Token0: {}, Token1: {}", + event.getToken0Address(), event.getToken1Address()); + return json; + } catch (Exception e) { + return "Error converting elaborated swap event to JSON: " + e.getMessage(); + } + }) + .print("Elaborated Swap Event: "); + + // Map the blocks to pretty-printed JSON strings /* blockStream @@ -127,35 +151,35 @@ public class DataStreamJob { .print("Swap Event: "); */ - mintStream - .map(event -> { - try { - return mapper.writeValueAsString(event); - } catch (Exception e) { - return "Error converting mint event to JSON: " + e.getMessage(); - } - }) - .print("Mint Event: "); +// mintStream +// .map(event -> { +// try { +// return mapper.writeValueAsString(event); +// } catch (Exception e) { +// return "Error converting mint event to JSON: " + e.getMessage(); +// } +// }) +// .print("Mint Event: "); - burnStream - .map(event -> { - try { - return mapper.writeValueAsString(event); - } catch (Exception e) { - return "Error converting burn event to JSON: " + e.getMessage(); - } - }) - .print("Burn Event: "); +// burnStream +// .map(event -> { +// try { +// return mapper.writeValueAsString(event); +// } catch (Exception e) { +// return "Error converting burn event to JSON: " + e.getMessage(); +// } +// }) +// .print("Burn Event: "); - swapStream - .map(event -> { - try { - return mapper.writeValueAsString(event); - } catch (Exception e) { - return "Error converting swap event to JSON: " + e.getMessage(); - } - }) - .print("Swap Event: "); +// swapStream +// .map(event -> { +// try { +// return mapper.writeValueAsString(event); +// } catch (Exception e) { +// return "Error converting swap event to JSON: " + e.getMessage(); +// } +// }) +// .print("Swap Event: "); env.execute("Ethereum Block Stream"); } diff --git a/src/main/java/stream/dto/ElaboratedSwapEvent.java b/src/main/java/stream/dto/ElaboratedSwapEvent.java new file mode 100644 index 0000000..1cbf1e5 --- /dev/null +++ b/src/main/java/stream/dto/ElaboratedSwapEvent.java @@ -0,0 +1,42 @@ +package stream.dto; + +import java.io.Serializable; + +public class ElaboratedSwapEvent implements Serializable { + private SwapEventLog swapEvent; + private String token0Address; + private String token1Address; + + public ElaboratedSwapEvent() { + } + + public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) { + this.swapEvent = swapEvent; + this.token0Address = token0Address; + this.token1Address = token1Address; + } + + public SwapEventLog getSwapEvent() { + return swapEvent; + } + + public void setSwapEvent(SwapEventLog swapEvent) { + this.swapEvent = swapEvent; + } + + public String getToken0Address() { + return token0Address; + } + + public void setToken0Address(String token0Address) { + this.token0Address = token0Address; + } + + public String getToken1Address() { + return token1Address; + } + + public void setToken1Address(String token1Address) { + this.token1Address = token1Address; + } +} \ No newline at end of file diff --git a/src/main/java/stream/dto/SwapEventLog.java b/src/main/java/stream/dto/SwapEventLog.java index 686b032..134cd47 100644 --- a/src/main/java/stream/dto/SwapEventLog.java +++ b/src/main/java/stream/dto/SwapEventLog.java @@ -39,4 +39,7 @@ public class SwapEventLog extends EventLog implements Serializable { public BigInteger liquidity; public int tick; + public String getAddress() { + return this.address; + } } \ No newline at end of file diff --git a/src/main/java/stream/io/PoolElaborator.java b/src/main/java/stream/io/PoolElaborator.java new file mode 100644 index 0000000..d0e3e88 --- /dev/null +++ b/src/main/java/stream/io/PoolElaborator.java @@ -0,0 +1,80 @@ +package stream.io; + +import stream.dto.SwapEventLog; +import stream.dto.ElaboratedSwapEvent; +import stream.contract.UniswapV3Pool; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.http.HttpService; +import org.web3j.crypto.Credentials; +import org.web3j.tx.gas.DefaultGasProvider; +import java.util.concurrent.CompletableFuture; +import java.util.Collections; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; + +public class PoolElaborator extends RichAsyncFunction { + private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class); + private transient Web3j web3j; + private transient Credentials credentials; + private transient DefaultGasProvider gasProvider; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + // Get RPC URL from job parameters + Map params = getRuntimeContext().getGlobalJobParameters(); + String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545"); + // TODO: Get from configuration if needed + + // Initialize Web3j + this.web3j = Web3j.build(new HttpService(rpcUrl)); + + // Dummy credentials for read-only operations + this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001"); + + // Default gas provider + this.gasProvider = new DefaultGasProvider(); + } + + @Override + public void asyncInvoke(SwapEventLog swap, ResultFuture resultFuture) throws Exception { + CompletableFuture.supplyAsync(() -> { + try { + // Load the pool contract + UniswapV3Pool pool = UniswapV3Pool.load( + swap.getAddress(), // Pool address from the event + web3j, + credentials, + gasProvider + ); + + // Get token addresses + String token0 = pool.token0().send(); + String token1 = pool.token1().send(); + + // Create enriched event + return new ElaboratedSwapEvent(swap, token0, token1); + + } catch (Exception e) { + log.error("Error fetching pool tokens", e); + // Return original without enrichment + return new ElaboratedSwapEvent(swap, null, null); + } + }).thenAccept(enriched -> { + resultFuture.complete(Collections.singletonList(enriched)); + }); + } + + @Override + public void close() throws Exception { + if (web3j != null) { + web3j.shutdown(); + } + } +}