diff --git "a/eval \"$(ssh-agent -s)\"" "b/eval \"$(ssh-agent -s)\"" deleted file mode 100644 index 3857e1c..0000000 --- "a/eval \"$(ssh-agent -s)\"" +++ /dev/null @@ -1,8 +0,0 @@ ------BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABBr7BKqJe -BOIqEijWReA7+HAAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY -416Fg7f3KjRUaaQ5x44CAWLsZtz4AAAAoCvYBgMQ51N0rrqc1FGsatMFbY+DQxyu4KDSJM -dH0Gr4m/B+3jyxHgQ+RmwRpoGk80Ew4zN6RIrsVZf8chfCybfyxFd/WuxvBIf4f3SVsu8z -k1WZ50YQvG5eATRzrRNDlcDxyyyLeSjn7Au2wEYYVPgzqvO5MzH+/B5PWItaVdb7fGUQ1W -xPRd+ZdJgZ0pIEPmjU440qG06hgYFTS0ywNv0= ------END OPENSSH PRIVATE KEY----- diff --git "a/eval \"$(ssh-agent -s)\".pub" "b/eval \"$(ssh-agent -s)\".pub" deleted file mode 100644 index edbbb8e..0000000 --- "a/eval \"$(ssh-agent -s)\".pub" +++ /dev/null @@ -1 +0,0 @@ -ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY416Fg7f3KjRUaaQ5x44CAWLsZtz4 surbhi.jhavar@gmail.com diff --git a/src/main/java/stream/dto/FullyElaboratedSwap.java b/src/main/java/stream/dto/FullyElaboratedSwap.java new file mode 100644 index 0000000..8237abc --- /dev/null +++ b/src/main/java/stream/dto/FullyElaboratedSwap.java @@ -0,0 +1,42 @@ +package stream.dto; + +import java.io.Serializable; + +public class FullyElaboratedSwap implements Serializable { + private ElaboratedSwapEvent swapEvent; + private Token token0; + private Token token1; + + public FullyElaboratedSwap() { + } + + public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) { + this.swapEvent = swapEvent; + this.token0 = token0; + this.token1 = token1; + } + + public ElaboratedSwapEvent getSwapEvent() { + return swapEvent; + } + + public void setSwapEvent(ElaboratedSwapEvent swapEvent) { + this.swapEvent = swapEvent; + } + + public Token getToken0() { + return token0; + } + + public void setToken0(Token token0) { + this.token0 = token0; + } + + public Token getToken1() { + return token1; + } + + public void setToken1(Token token1) { + this.token1 = token1; + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/SwapEnricher.java b/src/main/java/stream/io/SwapEnricher.java new file mode 100644 index 0000000..b2f39d6 --- /dev/null +++ b/src/main/java/stream/io/SwapEnricher.java @@ -0,0 +1,94 @@ + +package stream.io; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.*; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; +import java.math.RoundingMode; + +public class SwapEnricher extends RichMapFunction { + private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class); + private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP); + private static final BigDecimal Q96 = new BigDecimal(2).pow(96); + + @Override + public Swap map(FullyElaboratedSwap fullSwap) throws Exception { + ElaboratedSwapEvent event = fullSwap.getSwapEvent(); + SwapEventLog swapLog = event.getSwapEvent(); + Token token0 = fullSwap.getToken0(); + Token token1 = fullSwap.getToken1(); + + // For now, hardcode exchange as UNISWAP_V3 and time as current time + Long time = System.currentTimeMillis(); + Exchange exchange = Exchange.UNISWAP_V3; + + // Pool address + String pool = swapLog.address; + + // Get sqrtPriceX96 and calculate actual price + BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96); + BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT); + BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT); + + // Adjust price for decimals (price is token1/token0) + int decimalDiff = token0.decimals - token1.decimals; + BigDecimal adjustedPrice; + if (decimalDiff >= 0) { + BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff); + adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT); + } else { + BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff); + adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT); + } + + // Determine which token is in and which is out based on amount signs + boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0; + + String takerAsset; + String makerAsset; + BigDecimal amountIn; + BigDecimal amountOut; + BigDecimal finalPrice; + + if (isToken0In) { + // User is sending token0, receiving token1 + takerAsset = event.getToken0Address(); + makerAsset = event.getToken1Address(); + + // Convert amounts to human-readable format using decimals + amountIn = new BigDecimal(swapLog.amount0.abs()) + .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); + amountOut = new BigDecimal(swapLog.amount1) + .divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT); + + // Price is how much token1 you get per token0 + finalPrice = adjustedPrice; + } else { + // User is sending token1, receiving token0 + takerAsset = event.getToken1Address(); + makerAsset = event.getToken0Address(); + + // Convert amounts to human-readable format using decimals + amountIn = new BigDecimal(swapLog.amount1.abs()) + .divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT); + amountOut = new BigDecimal(swapLog.amount0) + .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); + + // Price is how much token0 you get per token1 (inverse of adjustedPrice) + finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT); + } + + log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}", + pool, token0.symbol + "/" + token1.symbol, + isToken0In ? token1.symbol : token0.symbol, + amountIn, amountOut, finalPrice); + + // Pass both amountIn and amountOut to constructor with unit price + return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice); + } +} \ No newline at end of file