From 0d8df3df9c0d771c53a7c1863ea58a69de0f1b47 Mon Sep 17 00:00:00 2001 From: surbhi Date: Wed, 8 Oct 2025 13:21:12 -0400 Subject: [PATCH] adding renamed elaborators --- .../stream/dto/SwapEventWithTokenIds.java | 59 ++++++++++++ .../dto/SwapEventWithTokenMetadata.java | 42 +++++++++ .../java/stream/dto/SwapWithTimestamp.java | 24 +++++ .../stream/io/BlockTimestampElaborator.java | 80 ++++++++++++++++ .../java/stream/io/PoolTokenIdElaborator.java | 81 ++++++++++++++++ src/main/java/stream/io/SwapElaborator.java | 94 +++++++++++++++++++ 6 files changed, 380 insertions(+) create mode 100644 src/main/java/stream/dto/SwapEventWithTokenIds.java create mode 100644 src/main/java/stream/dto/SwapEventWithTokenMetadata.java create mode 100644 src/main/java/stream/dto/SwapWithTimestamp.java create mode 100644 src/main/java/stream/io/BlockTimestampElaborator.java create mode 100644 src/main/java/stream/io/PoolTokenIdElaborator.java create mode 100644 src/main/java/stream/io/SwapElaborator.java diff --git a/src/main/java/stream/dto/SwapEventWithTokenIds.java b/src/main/java/stream/dto/SwapEventWithTokenIds.java new file mode 100644 index 0000000..b488e90 --- /dev/null +++ b/src/main/java/stream/dto/SwapEventWithTokenIds.java @@ -0,0 +1,59 @@ +package stream.dto; + +import java.io.Serializable; +import java.math.BigInteger; + +public class SwapEventWithTokenIds implements Serializable { + private SwapEventLog swapEvent; + private String token0Address; + private String token1Address; + private BigInteger timestamp; + + public SwapEventWithTokenIds() { + } + + public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) { + this.swapEvent = swapEvent; + this.token0Address = token0Address; + this.token1Address = token1Address; + } + + public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) { + this.swapEvent = swapEvent; + this.token0Address = token0Address; + this.token1Address = token1Address; + this.timestamp = timestamp; + } + + public SwapEventLog getSwapEvent() { + return swapEvent; + } + + public void setSwapEvent(SwapEventLog swapEvent) { + this.swapEvent = swapEvent; + } + + public String getToken0Address() { + return token0Address; + } + + public void setToken0Address(String token0Address) { + this.token0Address = token0Address; + } + + public String getToken1Address() { + return token1Address; + } + + public void setToken1Address(String token1Address) { + this.token1Address = token1Address; + } + + public BigInteger getTimestamp() { + return timestamp; + } + + public void setTimestamp(BigInteger timestamp) { + this.timestamp = timestamp; + } +} \ No newline at end of file diff --git a/src/main/java/stream/dto/SwapEventWithTokenMetadata.java b/src/main/java/stream/dto/SwapEventWithTokenMetadata.java new file mode 100644 index 0000000..92f6285 --- /dev/null +++ b/src/main/java/stream/dto/SwapEventWithTokenMetadata.java @@ -0,0 +1,42 @@ +package stream.dto; + +import java.io.Serializable; + +public class SwapEventWithTokenMetadata implements Serializable { + private SwapEventWithTokenIds swapEvent; + private Token token0; + private Token token1; + + public SwapEventWithTokenMetadata() { + } + + public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) { + this.swapEvent = swapEvent; + this.token0 = token0; + this.token1 = token1; + } + + public SwapEventWithTokenIds getSwapEvent() { + return swapEvent; + } + + public void setSwapEvent(SwapEventWithTokenIds swapEvent) { + this.swapEvent = swapEvent; + } + + public Token getToken0() { + return token0; + } + + public void setToken0(Token token0) { + this.token0 = token0; + } + + public Token getToken1() { + return token1; + } + + public void setToken1(Token token1) { + this.token1 = token1; + } +} \ No newline at end of file diff --git a/src/main/java/stream/dto/SwapWithTimestamp.java b/src/main/java/stream/dto/SwapWithTimestamp.java new file mode 100644 index 0000000..b7ba088 --- /dev/null +++ b/src/main/java/stream/dto/SwapWithTimestamp.java @@ -0,0 +1,24 @@ +package stream.dto; + +import java.io.Serializable; +import java.math.BigInteger; + +public class SwapWithTimestamp implements Serializable { + private static final long serialVersionUID = 1L; + + private final SwapEventLog swapEvent; + private final BigInteger timestamp; + + public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) { + this.swapEvent = swapEvent; + this.timestamp = timestamp; + } + + public SwapEventLog getSwapEvent() { + return swapEvent; + } + + public BigInteger getTimestamp() { + return timestamp; + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/BlockTimestampElaborator.java b/src/main/java/stream/io/BlockTimestampElaborator.java new file mode 100644 index 0000000..61a87c2 --- /dev/null +++ b/src/main/java/stream/io/BlockTimestampElaborator.java @@ -0,0 +1,80 @@ +package stream.io; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.web3j.protocol.core.methods.response.EthBlock; +import stream.dto.BlockHash; +import stream.dto.SwapEventLog; +import stream.dto.SwapWithTimestamp; + +import java.math.BigInteger; +import java.util.Collections; + +public class BlockTimestampElaborator extends RichAsyncFunction { + private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class); + private transient BlockElaborator blockElaborator; + + @Override + public void open(OpenContext openContext) throws Exception { + blockElaborator = new BlockElaborator(); + blockElaborator.setRuntimeContext(getRuntimeContext()); + blockElaborator.open(openContext); + } + + @Override + public void asyncInvoke(SwapEventLog swapEvent, ResultFuture resultFuture) { + try { + BlockHash blockHash = new BlockHash(); + blockHash.hash = swapEvent.blockHash; + + // Create a custom ResultFuture to handle the block elaboration result + ResultFuture blockResultFuture = new ResultFuture() { + @Override + public void complete(java.util.Collection result) { + try { + if (!result.isEmpty()) { + EthBlock ethBlock = result.iterator().next(); + BigInteger timestamp = ethBlock.getBlock().getTimestamp(); + SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp); + resultFuture.complete(Collections.singleton(swapWithTimestamp)); + } else { + log.error("No block found for block hash {}", swapEvent.blockHash); + resultFuture.completeExceptionally(new RuntimeException("No block found")); + } + } catch (Exception e) { + log.error("Failed to get timestamp for block {} of swap in tx {}", + swapEvent.blockHash, swapEvent.transactionHash, e); + resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e)); + } + } + + @Override + public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier collectionSupplier) { + try { + complete(collectionSupplier.get()); + } catch (Exception e) { + completeExceptionally(e); + } + } + + @Override + public void completeExceptionally(Throwable error) { + log.error("Failed to get block {} for swap in tx {}", + swapEvent.blockHash, swapEvent.transactionHash, error); + resultFuture.completeExceptionally(error); + } + }; + + // Invoke the block elaborator asynchronously + blockElaborator.asyncInvoke(blockHash, blockResultFuture); + + } catch (Exception e) { + log.error("Error in BlockTimestampElaborator for swap in tx {}", + swapEvent.transactionHash, e); + resultFuture.completeExceptionally(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/PoolTokenIdElaborator.java b/src/main/java/stream/io/PoolTokenIdElaborator.java new file mode 100644 index 0000000..8d261e3 --- /dev/null +++ b/src/main/java/stream/io/PoolTokenIdElaborator.java @@ -0,0 +1,81 @@ +package stream.io; + +import stream.dto.SwapWithTimestamp; +import stream.dto.SwapEventWithTokenIds; +import stream.contract.UniswapV3Pool; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.http.HttpService; +import org.web3j.crypto.Credentials; +import org.web3j.tx.gas.DefaultGasProvider; +import java.util.concurrent.CompletableFuture; +import java.util.Collections; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; + +public class PoolTokenIdElaborator extends RichAsyncFunction { + private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class); + private transient Web3j web3j; + private transient Credentials credentials; + private transient DefaultGasProvider gasProvider; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + // Get RPC URL from job parameters + Map params = getRuntimeContext().getGlobalJobParameters(); + String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545"); + // TODO: Get from configuration if needed + + // Initialize Web3j + this.web3j = Web3j.build(new HttpService(rpcUrl)); + + // Dummy credentials for read-only operations + this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001"); + + // Default gas provider + this.gasProvider = new DefaultGasProvider(); + } + + @Override + public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture resultFuture) throws Exception { + CompletableFuture.supplyAsync(() -> { + try { + // Load the pool contract + UniswapV3Pool pool = UniswapV3Pool.load( + swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event + web3j, + credentials, + gasProvider + ); + + // Get token addresses + String token0 = pool.token0().send(); + String token1 = pool.token1().send(); + + // Create enriched event with timestamp + return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp()); + + } catch (Exception e) { + log.error("Error fetching pool tokens for swap in pool {}", + swapWithTimestamp.getSwapEvent().getAddress(), e); + // Return original without enrichment but with timestamp + return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp()); + } + }).thenAccept(enriched -> { + resultFuture.complete(Collections.singletonList(enriched)); + }); + } + + @Override + public void close() throws Exception { + if (web3j != null) { + web3j.shutdown(); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/SwapElaborator.java b/src/main/java/stream/io/SwapElaborator.java new file mode 100644 index 0000000..f7c67e7 --- /dev/null +++ b/src/main/java/stream/io/SwapElaborator.java @@ -0,0 +1,94 @@ + +package stream.io; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.*; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; +import java.math.RoundingMode; + +public class SwapElaborator extends RichMapFunction { + private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class); + private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP); + private static final BigDecimal Q96 = new BigDecimal(2).pow(96); + + @Override + public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception { + SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent(); + SwapEventLog swapLog = event.getSwapEvent(); + Token token0 = swapWithMetadata.getToken0(); + Token token1 = swapWithMetadata.getToken1(); + + // Use timestamp from block elaboration and set exchange as UNISWAP_V3 + Long time = event.getTimestamp().longValue(); + Exchange exchange = Exchange.UNISWAP_V3; + + // Pool address + String pool = swapLog.address; + + // Get sqrtPriceX96 and calculate actual price + BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96); + BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT); + BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT); + + // Adjust price for decimals (price is token1/token0) + int decimalDiff = token0.decimals - token1.decimals; + BigDecimal adjustedPrice; + if (decimalDiff >= 0) { + BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff); + adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT); + } else { + BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff); + adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT); + } + + // Determine which token is in and which is out based on amount signs + boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0; + + String takerAsset; + String makerAsset; + BigDecimal amountIn; + BigDecimal amountOut; + BigDecimal finalPrice; + + if (isToken0In) { + // User is sending token0, receiving token1 + takerAsset = event.getToken0Address(); + makerAsset = event.getToken1Address(); + + // Convert amounts to human-readable format using decimals + amountIn = new BigDecimal(swapLog.amount0.abs()) + .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); + amountOut = new BigDecimal(swapLog.amount1) + .divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT); + + // Price is how much token1 you get per token0 + finalPrice = adjustedPrice; + } else { + // User is sending token1, receiving token0 + takerAsset = event.getToken1Address(); + makerAsset = event.getToken0Address(); + + // Convert amounts to human-readable format using decimals + amountIn = new BigDecimal(swapLog.amount1.abs()) + .divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT); + amountOut = new BigDecimal(swapLog.amount0) + .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); + + // Price is how much token0 you get per token1 (inverse of adjustedPrice) + finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT); + } + + log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}", + pool, token0.symbol + "/" + token1.symbol, + isToken0In ? token1.symbol : token0.symbol, + amountIn, amountOut, finalPrice); + + // Pass both amountIn and amountOut to constructor with unit price + return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice); + } +} \ No newline at end of file