Add PoolElaborator for token address enrichment
- Created PoolElaborator async function to retrieve token0 and token1 addresses from Uniswap v3 pools - Added ElaboratedSwapEvent DTO to store swap events with token addresses - Updated SwapEventLog with getAddress() method for pool contract access - Integrated PoolElaborator into DataStreamJob pipeline for real-time token enrichment - Configured dynamic RPC URL retrieval from job parameters
This commit is contained in:
@@ -27,6 +27,7 @@ import org.apache.flink.util.ParameterTool;
|
||||
import stream.contract.ArbitrumOne;
|
||||
import stream.dto.*;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import stream.io.PoolElaborator;
|
||||
import stream.io.TokenElaborator;
|
||||
import stream.source.eventlog.EventLogSourceFactory;
|
||||
import stream.source.newheads.NewHeadsSourceFactory;
|
||||
@@ -94,6 +95,29 @@ public class DataStreamJob {
|
||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||
|
||||
|
||||
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
swapStream,
|
||||
new PoolElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// Print the elaborated swap events with token addresses
|
||||
elaboratedSwapStream
|
||||
.map(event -> {
|
||||
try {
|
||||
String json = mapper.writeValueAsString(event);
|
||||
log.info("Elaborated Swap Event - Token0: {}, Token1: {}",
|
||||
event.getToken0Address(), event.getToken1Address());
|
||||
return json;
|
||||
} catch (Exception e) {
|
||||
return "Error converting elaborated swap event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Elaborated Swap Event: ");
|
||||
|
||||
|
||||
// Map the blocks to pretty-printed JSON strings
|
||||
/*
|
||||
blockStream
|
||||
@@ -127,35 +151,35 @@ public class DataStreamJob {
|
||||
.print("Swap Event: ");
|
||||
*/
|
||||
|
||||
mintStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting mint event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Mint Event: ");
|
||||
// mintStream
|
||||
// .map(event -> {
|
||||
// try {
|
||||
// return mapper.writeValueAsString(event);
|
||||
// } catch (Exception e) {
|
||||
// return "Error converting mint event to JSON: " + e.getMessage();
|
||||
// }
|
||||
// })
|
||||
// .print("Mint Event: ");
|
||||
|
||||
burnStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting burn event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Burn Event: ");
|
||||
// burnStream
|
||||
// .map(event -> {
|
||||
// try {
|
||||
// return mapper.writeValueAsString(event);
|
||||
// } catch (Exception e) {
|
||||
// return "Error converting burn event to JSON: " + e.getMessage();
|
||||
// }
|
||||
// })
|
||||
// .print("Burn Event: ");
|
||||
|
||||
swapStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting swap event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Swap Event: ");
|
||||
// swapStream
|
||||
// .map(event -> {
|
||||
// try {
|
||||
// return mapper.writeValueAsString(event);
|
||||
// } catch (Exception e) {
|
||||
// return "Error converting swap event to JSON: " + e.getMessage();
|
||||
// }
|
||||
// })
|
||||
// .print("Swap Event: ");
|
||||
|
||||
env.execute("Ethereum Block Stream");
|
||||
}
|
||||
|
||||
42
src/main/java/stream/dto/ElaboratedSwapEvent.java
Normal file
42
src/main/java/stream/dto/ElaboratedSwapEvent.java
Normal file
@@ -0,0 +1,42 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ElaboratedSwapEvent implements Serializable {
|
||||
private SwapEventLog swapEvent;
|
||||
private String token0Address;
|
||||
private String token1Address;
|
||||
|
||||
public ElaboratedSwapEvent() {
|
||||
}
|
||||
|
||||
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
||||
this.swapEvent = swapEvent;
|
||||
this.token0Address = token0Address;
|
||||
this.token1Address = token1Address;
|
||||
}
|
||||
|
||||
public SwapEventLog getSwapEvent() {
|
||||
return swapEvent;
|
||||
}
|
||||
|
||||
public void setSwapEvent(SwapEventLog swapEvent) {
|
||||
this.swapEvent = swapEvent;
|
||||
}
|
||||
|
||||
public String getToken0Address() {
|
||||
return token0Address;
|
||||
}
|
||||
|
||||
public void setToken0Address(String token0Address) {
|
||||
this.token0Address = token0Address;
|
||||
}
|
||||
|
||||
public String getToken1Address() {
|
||||
return token1Address;
|
||||
}
|
||||
|
||||
public void setToken1Address(String token1Address) {
|
||||
this.token1Address = token1Address;
|
||||
}
|
||||
}
|
||||
@@ -39,4 +39,7 @@ public class SwapEventLog extends EventLog implements Serializable {
|
||||
public BigInteger liquidity;
|
||||
public int tick;
|
||||
|
||||
public String getAddress() {
|
||||
return this.address;
|
||||
}
|
||||
}
|
||||
80
src/main/java/stream/io/PoolElaborator.java
Normal file
80
src/main/java/stream/io/PoolElaborator.java
Normal file
@@ -0,0 +1,80 @@
|
||||
package stream.io;
|
||||
|
||||
import stream.dto.SwapEventLog;
|
||||
import stream.dto.ElaboratedSwapEvent;
|
||||
import stream.contract.UniswapV3Pool;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.http.HttpService;
|
||||
import org.web3j.crypto.Credentials;
|
||||
import org.web3j.tx.gas.DefaultGasProvider;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.Collections;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.Map;
|
||||
|
||||
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> {
|
||||
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class);
|
||||
private transient Web3j web3j;
|
||||
private transient Credentials credentials;
|
||||
private transient DefaultGasProvider gasProvider;
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
|
||||
// Get RPC URL from job parameters
|
||||
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||
// TODO: Get from configuration if needed
|
||||
|
||||
// Initialize Web3j
|
||||
this.web3j = Web3j.build(new HttpService(rpcUrl));
|
||||
|
||||
// Dummy credentials for read-only operations
|
||||
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
|
||||
|
||||
// Default gas provider
|
||||
this.gasProvider = new DefaultGasProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception {
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
// Load the pool contract
|
||||
UniswapV3Pool pool = UniswapV3Pool.load(
|
||||
swap.getAddress(), // Pool address from the event
|
||||
web3j,
|
||||
credentials,
|
||||
gasProvider
|
||||
);
|
||||
|
||||
// Get token addresses
|
||||
String token0 = pool.token0().send();
|
||||
String token1 = pool.token1().send();
|
||||
|
||||
// Create enriched event
|
||||
return new ElaboratedSwapEvent(swap, token0, token1);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error fetching pool tokens", e);
|
||||
// Return original without enrichment
|
||||
return new ElaboratedSwapEvent(swap, null, null);
|
||||
}
|
||||
}).thenAccept(enriched -> {
|
||||
resultFuture.complete(Collections.singletonList(enriched));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (web3j != null) {
|
||||
web3j.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user