Compare commits

..

2 Commits

6 changed files with 261 additions and 12 deletions

View File

@@ -33,6 +33,7 @@ import stream.io.PoolTokenIdElaborator;
import stream.io.TokenElaborator;
import stream.io.SwapElaborator;
import stream.io.BlockTimestampElaborator;
import stream.ohlc.OHLCPipeline;
import stream.source.eventlog.EventLogSourceFactory;
import java.util.HashMap;
@@ -84,7 +85,7 @@ public class DataStreamJob {
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
swapStream,
new BlockTimestampElaborator(),
30,
120,
TimeUnit.SECONDS
);
@@ -168,6 +169,33 @@ public class DataStreamJob {
DataStream<Swap> swaps = swapsWithTokens
.map(new SwapElaborator());
// Test OHLC with USDC/WETH pool
OHLCPipeline ohlcPipeline = new OHLCPipeline();
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
// Filter and print OHLC candles for USDC/WETH pool only
allOhlc
.filter(candle -> {
// Filter for specific USDC/WETH pool
String poolAddress = candle.getPool().toLowerCase();
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
return poolAddress.equals(targetPool);
})
.map(candle -> {
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
" Trades=" + candle.getTradeCount() +
" Open=" + candle.getOpen() +
" High=" + candle.getHigh() +
" Low=" + candle.getLow() +
" Close=" + candle.getClose() +
" Volume=" + candle.getVolume());
return candle;
})
.setParallelism(1)
.returns(OHLCCandle.class)
.print("USDC-WETH-OHLC");
// Print the final enriched swap objects
swaps
.map(swap -> {

View File

@@ -0,0 +1,56 @@
package stream.dto;
import java.math.BigDecimal;
public class OHLCCandle {
private final String pool;
private final String token0;
private final String token1;
private final long windowStart;
private final long windowEnd;
private final BigDecimal open;
private final BigDecimal high;
private final BigDecimal low;
private final BigDecimal close;
private final BigDecimal volume;
private final int tradeCount;
public OHLCCandle(String pool, String token0, String token1,
long windowStart, long windowEnd,
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
BigDecimal volume, int tradeCount) {
this.pool = pool;
this.token0 = token0;
this.token1 = token1;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.open = open;
this.high = high;
this.low = low;
this.close = close;
this.volume = volume;
this.tradeCount = tradeCount;
}
// Getters
public String getPool() { return pool; }
public String getToken0() { return token0; }
public String getToken1() { return token1; }
public long getWindowStart() { return windowStart; }
public long getWindowEnd() { return windowEnd; }
public BigDecimal getOpen() { return open; }
public BigDecimal getHigh() { return high; }
public BigDecimal getLow() { return low; }
public BigDecimal getClose() { return close; }
public BigDecimal getVolume() { return volume; }
public int getTradeCount() { return tradeCount; }
@Override
public String toString() {
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
pool.substring(0, 8) + "...",
token0.substring(0, 6), token1.substring(0, 6),
windowStart, windowEnd,
open, high, low, close, volume, tradeCount);
}
}

View File

@@ -45,10 +45,14 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
}
// Check if this is a WETH/USDC swap for debugging
boolean isWethUsdcPool = false;
String token0Lower = event.getToken0Address().toLowerCase();
String token1Lower = event.getToken1Address().toLowerCase();
// Determine which token is in and which is out based on amount signs
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
String takerAsset;
String makerAsset;
BigDecimal amountIn;
@@ -59,7 +63,6 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
// User is sending token0, receiving token1
takerAsset = event.getToken0Address();
makerAsset = event.getToken1Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount0.abs())
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
@@ -79,15 +82,12 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
amountOut = new BigDecimal(swapLog.amount0)
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
// Currently adjustedPrice = token1/token0, so we keep it as is
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
finalPrice = adjustedPrice;
}
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
pool, token0.symbol + "/" + token1.symbol,
isToken0In ? token1.symbol : token0.symbol,
amountIn, amountOut, finalPrice);
// Pass both amountIn and amountOut to constructor with unit price
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
}

View File

@@ -0,0 +1,41 @@
package stream.ohlc;
import java.math.BigDecimal;
public class OHLCAccumulator {
public String pool = null;
public String token0 = null; // Added
public String token1 = null; // Added
public BigDecimal open = null;
public BigDecimal high = null;
public BigDecimal low = null;
public BigDecimal close = null;
public BigDecimal volume = BigDecimal.ZERO;
public int tradeCount = 0;
public long firstTradeTime = 0;
public long lastTradeTime = 0;
public OHLCAccumulator() {}
public void updatePrice(BigDecimal price, long timestamp) {
if (open == null) {
open = price;
high = price;
low = price;
firstTradeTime = timestamp;
} else {
high = high.max(price);
low = low.min(price);
}
close = price;
lastTradeTime = timestamp;
}
public void addVolume(BigDecimal amount) {
volume = volume.add(amount);
}
public void incrementTradeCount() {
tradeCount++;
}
}

View File

@@ -0,0 +1,98 @@
package stream.ohlc;
import org.apache.flink.api.common.functions.AggregateFunction;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
import java.math.BigDecimal;
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
private final long windowSize = 60; // 1 minute in seconds
@Override
public OHLCAccumulator createAccumulator() {
return new OHLCAccumulator();
}
@Override
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
// Initialize pool and tokens on first swap
if (accumulator.pool == null) {
accumulator.pool = swap.getPool();
// Store tokens in consistent order (you could sort by address)
accumulator.token0 = swap.getTakerAsset();
accumulator.token1 = swap.getMakerAsset();
}
// Update OHLC prices
accumulator.updatePrice(swap.getPrice(), swap.getTime());
// Calculate volume (using the taker's input amount as volume)
BigDecimal volume = swap.getAmountIn().abs();
accumulator.addVolume(volume);
// Increment trade count
accumulator.incrementTradeCount();
return accumulator;
}
@Override
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
OHLCAccumulator merged = new OHLCAccumulator();
// Merge pool and token info
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
// Merge OHLC data
if (acc1.open != null && acc2.open != null) {
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
merged.high = acc1.high.max(acc2.high);
merged.low = acc1.low.min(acc2.low);
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
} else if (acc1.open != null) {
merged.open = acc1.open;
merged.close = acc1.close;
merged.high = acc1.high;
merged.low = acc1.low;
merged.firstTradeTime = acc1.firstTradeTime;
merged.lastTradeTime = acc1.lastTradeTime;
} else if (acc2.open != null) {
merged.open = acc2.open;
merged.close = acc2.close;
merged.high = acc2.high;
merged.low = acc2.low;
merged.firstTradeTime = acc2.firstTradeTime;
merged.lastTradeTime = acc2.lastTradeTime;
}
merged.volume = acc1.volume.add(acc2.volume);
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
return merged;
}
@Override
public OHLCCandle getResult(OHLCAccumulator accumulator) {
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
long windowEnd = windowStart + windowSize;
return new OHLCCandle(
accumulator.pool,
accumulator.token0,
accumulator.token1,
windowStart,
windowEnd,
accumulator.open,
accumulator.high,
accumulator.low,
accumulator.close,
accumulator.volume,
accumulator.tradeCount
);
}
}

View File

@@ -0,0 +1,26 @@
package stream.ohlc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import java.time.Duration;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
public class OHLCPipeline {
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
return swapStream
// NO watermarks needed for processing-time windows
.keyBy(Swap::getPool)
// Use 1-minute processing-time windows
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
// Simply use the aggregator
.aggregate(new OHLCAggregator())
// Filter out empty windows
.filter(candle -> candle.getTradeCount() > 0);
}
}