From 88fce17efcf0655c46976219ccd6472238dda440 Mon Sep 17 00:00:00 2001 From: surbhi Date: Thu, 9 Oct 2025 13:03:31 -0400 Subject: [PATCH] OHLC pipeline stuff --- src/main/java/stream/dto/OHLCCandle.java | 56 +++++++++++ .../java/stream/ohlc/OHLCAccumulator.java | 41 ++++++++ src/main/java/stream/ohlc/OHLCAggregator.java | 98 +++++++++++++++++++ src/main/java/stream/ohlc/OHLCPipeline.java | 26 +++++ 4 files changed, 221 insertions(+) create mode 100644 src/main/java/stream/dto/OHLCCandle.java create mode 100644 src/main/java/stream/ohlc/OHLCAccumulator.java create mode 100644 src/main/java/stream/ohlc/OHLCAggregator.java create mode 100644 src/main/java/stream/ohlc/OHLCPipeline.java diff --git a/src/main/java/stream/dto/OHLCCandle.java b/src/main/java/stream/dto/OHLCCandle.java new file mode 100644 index 0000000..2d3a5a9 --- /dev/null +++ b/src/main/java/stream/dto/OHLCCandle.java @@ -0,0 +1,56 @@ +package stream.dto; + +import java.math.BigDecimal; + +public class OHLCCandle { + private final String pool; + private final String token0; + private final String token1; + private final long windowStart; + private final long windowEnd; + private final BigDecimal open; + private final BigDecimal high; + private final BigDecimal low; + private final BigDecimal close; + private final BigDecimal volume; + private final int tradeCount; + + public OHLCCandle(String pool, String token0, String token1, + long windowStart, long windowEnd, + BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close, + BigDecimal volume, int tradeCount) { + this.pool = pool; + this.token0 = token0; + this.token1 = token1; + this.windowStart = windowStart; + this.windowEnd = windowEnd; + this.open = open; + this.high = high; + this.low = low; + this.close = close; + this.volume = volume; + this.tradeCount = tradeCount; + } + + // Getters + public String getPool() { return pool; } + public String getToken0() { return token0; } + public String getToken1() { return token1; } + public long getWindowStart() { return windowStart; } + public long getWindowEnd() { return windowEnd; } + public BigDecimal getOpen() { return open; } + public BigDecimal getHigh() { return high; } + public BigDecimal getLow() { return low; } + public BigDecimal getClose() { return close; } + public BigDecimal getVolume() { return volume; } + public int getTradeCount() { return tradeCount; } + + @Override + public String toString() { + return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d", + pool.substring(0, 8) + "...", + token0.substring(0, 6), token1.substring(0, 6), + windowStart, windowEnd, + open, high, low, close, volume, tradeCount); + } +} \ No newline at end of file diff --git a/src/main/java/stream/ohlc/OHLCAccumulator.java b/src/main/java/stream/ohlc/OHLCAccumulator.java new file mode 100644 index 0000000..c3ef519 --- /dev/null +++ b/src/main/java/stream/ohlc/OHLCAccumulator.java @@ -0,0 +1,41 @@ +package stream.ohlc; + +import java.math.BigDecimal; + +public class OHLCAccumulator { + public String pool = null; + public String token0 = null; // Added + public String token1 = null; // Added + public BigDecimal open = null; + public BigDecimal high = null; + public BigDecimal low = null; + public BigDecimal close = null; + public BigDecimal volume = BigDecimal.ZERO; + public int tradeCount = 0; + public long firstTradeTime = 0; + public long lastTradeTime = 0; + + public OHLCAccumulator() {} + + public void updatePrice(BigDecimal price, long timestamp) { + if (open == null) { + open = price; + high = price; + low = price; + firstTradeTime = timestamp; + } else { + high = high.max(price); + low = low.min(price); + } + close = price; + lastTradeTime = timestamp; + } + + public void addVolume(BigDecimal amount) { + volume = volume.add(amount); + } + + public void incrementTradeCount() { + tradeCount++; + } +} \ No newline at end of file diff --git a/src/main/java/stream/ohlc/OHLCAggregator.java b/src/main/java/stream/ohlc/OHLCAggregator.java new file mode 100644 index 0000000..a12c885 --- /dev/null +++ b/src/main/java/stream/ohlc/OHLCAggregator.java @@ -0,0 +1,98 @@ +package stream.ohlc; + +import org.apache.flink.api.common.functions.AggregateFunction; +import stream.dto.Swap; +import stream.dto.OHLCCandle; +import java.math.BigDecimal; + +public class OHLCAggregator implements AggregateFunction { + + private final long windowSize = 60; // 1 minute in seconds + + @Override + public OHLCAccumulator createAccumulator() { + return new OHLCAccumulator(); + } + + @Override + public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) { + // Initialize pool and tokens on first swap + if (accumulator.pool == null) { + accumulator.pool = swap.getPool(); + // Store tokens in consistent order (you could sort by address) + accumulator.token0 = swap.getTakerAsset(); + accumulator.token1 = swap.getMakerAsset(); + } + + // Update OHLC prices + accumulator.updatePrice(swap.getPrice(), swap.getTime()); + + // Calculate volume (using the taker's input amount as volume) + BigDecimal volume = swap.getAmountIn().abs(); + accumulator.addVolume(volume); + + // Increment trade count + accumulator.incrementTradeCount(); + + return accumulator; + } + + @Override + public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) { + OHLCAccumulator merged = new OHLCAccumulator(); + + // Merge pool and token info + merged.pool = acc1.pool != null ? acc1.pool : acc2.pool; + merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0; + merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1; + + // Merge OHLC data + if (acc1.open != null && acc2.open != null) { + merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open; + merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close; + merged.high = acc1.high.max(acc2.high); + merged.low = acc1.low.min(acc2.low); + merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime); + merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime); + } else if (acc1.open != null) { + merged.open = acc1.open; + merged.close = acc1.close; + merged.high = acc1.high; + merged.low = acc1.low; + merged.firstTradeTime = acc1.firstTradeTime; + merged.lastTradeTime = acc1.lastTradeTime; + } else if (acc2.open != null) { + merged.open = acc2.open; + merged.close = acc2.close; + merged.high = acc2.high; + merged.low = acc2.low; + merged.firstTradeTime = acc2.firstTradeTime; + merged.lastTradeTime = acc2.lastTradeTime; + } + + merged.volume = acc1.volume.add(acc2.volume); + merged.tradeCount = acc1.tradeCount + acc2.tradeCount; + + return merged; + } + + @Override + public OHLCCandle getResult(OHLCAccumulator accumulator) { + long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize; + long windowEnd = windowStart + windowSize; + + return new OHLCCandle( + accumulator.pool, + accumulator.token0, + accumulator.token1, + windowStart, + windowEnd, + accumulator.open, + accumulator.high, + accumulator.low, + accumulator.close, + accumulator.volume, + accumulator.tradeCount + ); + } +} \ No newline at end of file diff --git a/src/main/java/stream/ohlc/OHLCPipeline.java b/src/main/java/stream/ohlc/OHLCPipeline.java new file mode 100644 index 0000000..9e5be88 --- /dev/null +++ b/src/main/java/stream/ohlc/OHLCPipeline.java @@ -0,0 +1,26 @@ +package stream.ohlc; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import java.time.Duration; +import stream.dto.Swap; +import stream.dto.OHLCCandle; + +public class OHLCPipeline { + + public DataStream createOHLCStream(DataStream swapStream) { + + return swapStream + // NO watermarks needed for processing-time windows + .keyBy(Swap::getPool) + + // Use 1-minute processing-time windows + .window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1))) + + // Simply use the aggregator + .aggregate(new OHLCAggregator()) + + // Filter out empty windows + .filter(candle -> candle.getTradeCount() > 0); + } +} \ No newline at end of file