Using timestmap from blockhash. Also renaming elaborators to be more descriptive and removing unneeded logs and code
This commit is contained in:
@@ -29,11 +29,11 @@ import org.apache.flink.util.ParameterTool;
|
||||
import org.apache.flink.util.Collector;
|
||||
import stream.dto.*;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import stream.io.PoolElaborator;
|
||||
import stream.io.PoolTokenIdElaborator;
|
||||
import stream.io.TokenElaborator;
|
||||
import stream.io.SwapEnricher;
|
||||
import stream.io.SwapElaborator;
|
||||
import stream.io.BlockTimestampElaborator;
|
||||
import stream.source.eventlog.EventLogSourceFactory;
|
||||
import stream.source.newheads.NewHeadsSourceFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@@ -71,9 +71,6 @@ public class DataStreamJob {
|
||||
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
env.getConfig().setGlobalJobParameters(parameters);
|
||||
// do not do this until considering how secrets are handled by flink
|
||||
// env.getConfig().setGlobalJobParameters(parameters);
|
||||
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
|
||||
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
|
||||
|
||||
// Create ObjectMapper for pretty JSON printing
|
||||
@@ -81,43 +78,28 @@ public class DataStreamJob {
|
||||
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||
|
||||
|
||||
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
||||
.fromSource(
|
||||
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
|
||||
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
|
||||
"ArbitrumOne Head Blocks",
|
||||
TypeInformation.of(ArbitrumOneBlock.class)
|
||||
);
|
||||
|
||||
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
|
||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||
|
||||
// Log raw swap events
|
||||
swapStream.map(swapLog -> {
|
||||
log.info("RAW SWAP EVENT - Pool: {}, Sender: {}, Recipient: {}, Amount0: {}, Amount1: {}, SqrtPriceX96: {}, Liquidity: {}, Tick: {}, TxHash: {}",
|
||||
swapLog.address, swapLog.sender, swapLog.recipient,
|
||||
swapLog.amount0, swapLog.amount1, swapLog.sqrtPriceX96,
|
||||
swapLog.liquidity, swapLog.tick, swapLog.transactionHash);
|
||||
return swapLog;
|
||||
});
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
// Add block timestamp to swap events
|
||||
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
|
||||
swapStream,
|
||||
new PoolElaborator(),
|
||||
new BlockTimestampElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
swapWithTimestampStream,
|
||||
new PoolTokenIdElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// Extract token addresses and elaborate with metadata
|
||||
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
|
||||
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() {
|
||||
.flatMap(new FlatMapFunction<SwapEventWithTokenIds, AddressId>() {
|
||||
@Override
|
||||
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception {
|
||||
public void flatMap(SwapEventWithTokenIds event, Collector<AddressId> collector) throws Exception {
|
||||
collector.collect(new AddressId(42161, event.getToken0Address()));
|
||||
collector.collect(new AddressId(42161, event.getToken1Address()));
|
||||
}
|
||||
@@ -131,15 +113,15 @@ public class DataStreamJob {
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// Connect swap events with token metadata to create FullyElaboratedSwap
|
||||
DataStream<FullyElaboratedSwap> fullyElaboratedSwaps = elaboratedSwapStream
|
||||
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
|
||||
DataStream<SwapEventWithTokenMetadata> swapsWithTokens = elaboratedSwapStream
|
||||
.connect(elaboratedTokens)
|
||||
.flatMap(new RichCoFlatMapFunction<ElaboratedSwapEvent, Token, FullyElaboratedSwap>() {
|
||||
.flatMap(new RichCoFlatMapFunction<SwapEventWithTokenIds, Token, SwapEventWithTokenMetadata>() {
|
||||
private final Map<String, Token> tokenCache = new HashMap<>();
|
||||
private final Map<String, ElaboratedSwapEvent> pendingSwaps = new HashMap<>();
|
||||
private final Map<String, SwapEventWithTokenIds> pendingSwaps = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void flatMap1(ElaboratedSwapEvent event, Collector<FullyElaboratedSwap> out) throws Exception {
|
||||
public void flatMap1(SwapEventWithTokenIds event, Collector<SwapEventWithTokenMetadata> out) throws Exception {
|
||||
String token0Addr = event.getToken0Address().toLowerCase();
|
||||
String token1Addr = event.getToken1Address().toLowerCase();
|
||||
|
||||
@@ -147,10 +129,9 @@ public class DataStreamJob {
|
||||
Token token1 = tokenCache.get(token1Addr);
|
||||
|
||||
if (token0 != null && token1 != null) {
|
||||
// We have both tokens, create FullyElaboratedSwap
|
||||
SwapEventLog swapLog = event.getSwapEvent();
|
||||
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1);
|
||||
out.collect(fullSwap);
|
||||
// We have both tokens, create SwapEventWithTokenMetadata
|
||||
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
|
||||
out.collect(swapWithMetadata);
|
||||
} else {
|
||||
// Cache the swap event until we get both tokens
|
||||
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
|
||||
@@ -158,15 +139,15 @@ public class DataStreamJob {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap2(Token token, Collector<FullyElaboratedSwap> out) throws Exception {
|
||||
public void flatMap2(Token token, Collector<SwapEventWithTokenMetadata> out) throws Exception {
|
||||
// Cache the token
|
||||
tokenCache.put(token.address.toLowerCase(), token);
|
||||
|
||||
// Check if any pending swaps can now be completed
|
||||
Iterator<Map.Entry<String, ElaboratedSwapEvent>> iterator = pendingSwaps.entrySet().iterator();
|
||||
Iterator<Map.Entry<String, SwapEventWithTokenIds>> iterator = pendingSwaps.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<String, ElaboratedSwapEvent> entry = iterator.next();
|
||||
ElaboratedSwapEvent event = entry.getValue();
|
||||
Map.Entry<String, SwapEventWithTokenIds> entry = iterator.next();
|
||||
SwapEventWithTokenIds event = entry.getValue();
|
||||
|
||||
String token0Addr = event.getToken0Address().toLowerCase();
|
||||
String token1Addr = event.getToken1Address().toLowerCase();
|
||||
@@ -175,21 +156,20 @@ public class DataStreamJob {
|
||||
Token token1 = tokenCache.get(token1Addr);
|
||||
|
||||
if (token0 != null && token1 != null) {
|
||||
SwapEventLog swapLog = event.getSwapEvent();
|
||||
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1);
|
||||
out.collect(fullSwap);
|
||||
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
|
||||
out.collect(swapWithMetadata);
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Apply SwapEnricher to create final Swap objects with calculated prices
|
||||
DataStream<Swap> enrichedSwaps = fullyElaboratedSwaps
|
||||
.map(new SwapEnricher());
|
||||
// Apply SwapElaborator to create final Swap objects with calculated prices
|
||||
DataStream<Swap> swaps = swapsWithTokens
|
||||
.map(new SwapElaborator());
|
||||
|
||||
// Print the final enriched swap objects
|
||||
enrichedSwaps
|
||||
swaps
|
||||
.map(swap -> {
|
||||
try {
|
||||
String json = mapper.writeValueAsString(swap);
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ElaboratedSwapEvent implements Serializable {
|
||||
private SwapEventLog swapEvent;
|
||||
private String token0Address;
|
||||
private String token1Address;
|
||||
|
||||
public ElaboratedSwapEvent() {
|
||||
}
|
||||
|
||||
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
||||
this.swapEvent = swapEvent;
|
||||
this.token0Address = token0Address;
|
||||
this.token1Address = token1Address;
|
||||
}
|
||||
|
||||
public SwapEventLog getSwapEvent() {
|
||||
return swapEvent;
|
||||
}
|
||||
|
||||
public void setSwapEvent(SwapEventLog swapEvent) {
|
||||
this.swapEvent = swapEvent;
|
||||
}
|
||||
|
||||
public String getToken0Address() {
|
||||
return token0Address;
|
||||
}
|
||||
|
||||
public void setToken0Address(String token0Address) {
|
||||
this.token0Address = token0Address;
|
||||
}
|
||||
|
||||
public String getToken1Address() {
|
||||
return token1Address;
|
||||
}
|
||||
|
||||
public void setToken1Address(String token1Address) {
|
||||
this.token1Address = token1Address;
|
||||
}
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class FullyElaboratedSwap implements Serializable {
|
||||
private ElaboratedSwapEvent swapEvent;
|
||||
private Token token0;
|
||||
private Token token1;
|
||||
|
||||
public FullyElaboratedSwap() {
|
||||
}
|
||||
|
||||
public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) {
|
||||
this.swapEvent = swapEvent;
|
||||
this.token0 = token0;
|
||||
this.token1 = token1;
|
||||
}
|
||||
|
||||
public ElaboratedSwapEvent getSwapEvent() {
|
||||
return swapEvent;
|
||||
}
|
||||
|
||||
public void setSwapEvent(ElaboratedSwapEvent swapEvent) {
|
||||
this.swapEvent = swapEvent;
|
||||
}
|
||||
|
||||
public Token getToken0() {
|
||||
return token0;
|
||||
}
|
||||
|
||||
public void setToken0(Token token0) {
|
||||
this.token0 = token0;
|
||||
}
|
||||
|
||||
public Token getToken1() {
|
||||
return token1;
|
||||
}
|
||||
|
||||
public void setToken1(Token token1) {
|
||||
this.token1 = token1;
|
||||
}
|
||||
}
|
||||
@@ -1,80 +0,0 @@
|
||||
package stream.io;
|
||||
|
||||
import stream.dto.SwapEventLog;
|
||||
import stream.dto.ElaboratedSwapEvent;
|
||||
import stream.contract.UniswapV3Pool;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.http.HttpService;
|
||||
import org.web3j.crypto.Credentials;
|
||||
import org.web3j.tx.gas.DefaultGasProvider;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.Collections;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.Map;
|
||||
|
||||
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> {
|
||||
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class);
|
||||
private transient Web3j web3j;
|
||||
private transient Credentials credentials;
|
||||
private transient DefaultGasProvider gasProvider;
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
|
||||
// Get RPC URL from job parameters
|
||||
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||
// TODO: Get from configuration if needed
|
||||
|
||||
// Initialize Web3j
|
||||
this.web3j = Web3j.build(new HttpService(rpcUrl));
|
||||
|
||||
// Dummy credentials for read-only operations
|
||||
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
|
||||
|
||||
// Default gas provider
|
||||
this.gasProvider = new DefaultGasProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception {
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
// Load the pool contract
|
||||
UniswapV3Pool pool = UniswapV3Pool.load(
|
||||
swap.getAddress(), // Pool address from the event
|
||||
web3j,
|
||||
credentials,
|
||||
gasProvider
|
||||
);
|
||||
|
||||
// Get token addresses
|
||||
String token0 = pool.token0().send();
|
||||
String token1 = pool.token1().send();
|
||||
|
||||
// Create enriched event
|
||||
return new ElaboratedSwapEvent(swap, token0, token1);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error fetching pool tokens", e);
|
||||
// Return original without enrichment
|
||||
return new ElaboratedSwapEvent(swap, null, null);
|
||||
}
|
||||
}).thenAccept(enriched -> {
|
||||
resultFuture.complete(Collections.singletonList(enriched));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (web3j != null) {
|
||||
web3j.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,94 +0,0 @@
|
||||
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import stream.dto.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.math.MathContext;
|
||||
import java.math.RoundingMode;
|
||||
|
||||
public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
|
||||
private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class);
|
||||
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
|
||||
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
|
||||
|
||||
@Override
|
||||
public Swap map(FullyElaboratedSwap fullSwap) throws Exception {
|
||||
ElaboratedSwapEvent event = fullSwap.getSwapEvent();
|
||||
SwapEventLog swapLog = event.getSwapEvent();
|
||||
Token token0 = fullSwap.getToken0();
|
||||
Token token1 = fullSwap.getToken1();
|
||||
|
||||
// For now, hardcode exchange as UNISWAP_V3 and time as current time
|
||||
Long time = System.currentTimeMillis();
|
||||
Exchange exchange = Exchange.UNISWAP_V3;
|
||||
|
||||
// Pool address
|
||||
String pool = swapLog.address;
|
||||
|
||||
// Get sqrtPriceX96 and calculate actual price
|
||||
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
|
||||
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
|
||||
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
|
||||
|
||||
// Adjust price for decimals (price is token1/token0)
|
||||
int decimalDiff = token0.decimals - token1.decimals;
|
||||
BigDecimal adjustedPrice;
|
||||
if (decimalDiff >= 0) {
|
||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
|
||||
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
|
||||
} else {
|
||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
||||
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||
}
|
||||
|
||||
// Determine which token is in and which is out based on amount signs
|
||||
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||
|
||||
String takerAsset;
|
||||
String makerAsset;
|
||||
BigDecimal amountIn;
|
||||
BigDecimal amountOut;
|
||||
BigDecimal finalPrice;
|
||||
|
||||
if (isToken0In) {
|
||||
// User is sending token0, receiving token1
|
||||
takerAsset = event.getToken0Address();
|
||||
makerAsset = event.getToken1Address();
|
||||
|
||||
// Convert amounts to human-readable format using decimals
|
||||
amountIn = new BigDecimal(swapLog.amount0.abs())
|
||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||
amountOut = new BigDecimal(swapLog.amount1)
|
||||
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
|
||||
|
||||
// Price is how much token1 you get per token0
|
||||
finalPrice = adjustedPrice;
|
||||
} else {
|
||||
// User is sending token1, receiving token0
|
||||
takerAsset = event.getToken1Address();
|
||||
makerAsset = event.getToken0Address();
|
||||
|
||||
// Convert amounts to human-readable format using decimals
|
||||
amountIn = new BigDecimal(swapLog.amount1.abs())
|
||||
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
|
||||
amountOut = new BigDecimal(swapLog.amount0)
|
||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||
|
||||
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
|
||||
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
|
||||
}
|
||||
|
||||
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
|
||||
pool, token0.symbol + "/" + token1.symbol,
|
||||
isToken0In ? token1.symbol : token0.symbol,
|
||||
amountIn, amountOut, finalPrice);
|
||||
|
||||
// Pass both amountIn and amountOut to constructor with unit price
|
||||
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user