removing price inversion in the swap ellaborator since we always want to represent the vlaue of the swap in its quote curency
This commit is contained in:
@@ -33,6 +33,7 @@ import stream.io.PoolTokenIdElaborator;
|
|||||||
import stream.io.TokenElaborator;
|
import stream.io.TokenElaborator;
|
||||||
import stream.io.SwapElaborator;
|
import stream.io.SwapElaborator;
|
||||||
import stream.io.BlockTimestampElaborator;
|
import stream.io.BlockTimestampElaborator;
|
||||||
|
import stream.ohlc.OHLCPipeline;
|
||||||
import stream.source.eventlog.EventLogSourceFactory;
|
import stream.source.eventlog.EventLogSourceFactory;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -84,7 +85,7 @@ public class DataStreamJob {
|
|||||||
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
|
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
|
||||||
swapStream,
|
swapStream,
|
||||||
new BlockTimestampElaborator(),
|
new BlockTimestampElaborator(),
|
||||||
30,
|
120,
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -168,6 +169,33 @@ public class DataStreamJob {
|
|||||||
DataStream<Swap> swaps = swapsWithTokens
|
DataStream<Swap> swaps = swapsWithTokens
|
||||||
.map(new SwapElaborator());
|
.map(new SwapElaborator());
|
||||||
|
|
||||||
|
// Test OHLC with USDC/WETH pool
|
||||||
|
OHLCPipeline ohlcPipeline = new OHLCPipeline();
|
||||||
|
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
|
||||||
|
|
||||||
|
// Filter and print OHLC candles for USDC/WETH pool only
|
||||||
|
allOhlc
|
||||||
|
.filter(candle -> {
|
||||||
|
// Filter for specific USDC/WETH pool
|
||||||
|
String poolAddress = candle.getPool().toLowerCase();
|
||||||
|
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
|
||||||
|
return poolAddress.equals(targetPool);
|
||||||
|
})
|
||||||
|
.map(candle -> {
|
||||||
|
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
|
||||||
|
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
|
||||||
|
" Trades=" + candle.getTradeCount() +
|
||||||
|
" Open=" + candle.getOpen() +
|
||||||
|
" High=" + candle.getHigh() +
|
||||||
|
" Low=" + candle.getLow() +
|
||||||
|
" Close=" + candle.getClose() +
|
||||||
|
" Volume=" + candle.getVolume());
|
||||||
|
return candle;
|
||||||
|
})
|
||||||
|
.setParallelism(1)
|
||||||
|
.returns(OHLCCandle.class)
|
||||||
|
.print("USDC-WETH-OHLC");
|
||||||
|
|
||||||
// Print the final enriched swap objects
|
// Print the final enriched swap objects
|
||||||
swaps
|
swaps
|
||||||
.map(swap -> {
|
.map(swap -> {
|
||||||
|
|||||||
@@ -45,10 +45,14 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
|||||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
||||||
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if this is a WETH/USDC swap for debugging
|
||||||
|
boolean isWethUsdcPool = false;
|
||||||
|
String token0Lower = event.getToken0Address().toLowerCase();
|
||||||
|
String token1Lower = event.getToken1Address().toLowerCase();
|
||||||
|
|
||||||
// Determine which token is in and which is out based on amount signs
|
// Determine which token is in and which is out based on amount signs
|
||||||
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||||
|
|
||||||
String takerAsset;
|
String takerAsset;
|
||||||
String makerAsset;
|
String makerAsset;
|
||||||
BigDecimal amountIn;
|
BigDecimal amountIn;
|
||||||
@@ -59,7 +63,6 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
|||||||
// User is sending token0, receiving token1
|
// User is sending token0, receiving token1
|
||||||
takerAsset = event.getToken0Address();
|
takerAsset = event.getToken0Address();
|
||||||
makerAsset = event.getToken1Address();
|
makerAsset = event.getToken1Address();
|
||||||
|
|
||||||
// Convert amounts to human-readable format using decimals
|
// Convert amounts to human-readable format using decimals
|
||||||
amountIn = new BigDecimal(swapLog.amount0.abs())
|
amountIn = new BigDecimal(swapLog.amount0.abs())
|
||||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||||
@@ -79,15 +82,12 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
|||||||
amountOut = new BigDecimal(swapLog.amount0)
|
amountOut = new BigDecimal(swapLog.amount0)
|
||||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||||
|
|
||||||
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
|
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
|
||||||
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
|
// Currently adjustedPrice = token1/token0, so we keep it as is
|
||||||
|
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
|
||||||
|
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
|
||||||
|
finalPrice = adjustedPrice;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
|
|
||||||
pool, token0.symbol + "/" + token1.symbol,
|
|
||||||
isToken0In ? token1.symbol : token0.symbol,
|
|
||||||
amountIn, amountOut, finalPrice);
|
|
||||||
|
|
||||||
// Pass both amountIn and amountOut to constructor with unit price
|
// Pass both amountIn and amountOut to constructor with unit price
|
||||||
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user