OHLC pipeline stuff
This commit is contained in:
56
src/main/java/stream/dto/OHLCCandle.java
Normal file
56
src/main/java/stream/dto/OHLCCandle.java
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
public class OHLCCandle {
|
||||||
|
private final String pool;
|
||||||
|
private final String token0;
|
||||||
|
private final String token1;
|
||||||
|
private final long windowStart;
|
||||||
|
private final long windowEnd;
|
||||||
|
private final BigDecimal open;
|
||||||
|
private final BigDecimal high;
|
||||||
|
private final BigDecimal low;
|
||||||
|
private final BigDecimal close;
|
||||||
|
private final BigDecimal volume;
|
||||||
|
private final int tradeCount;
|
||||||
|
|
||||||
|
public OHLCCandle(String pool, String token0, String token1,
|
||||||
|
long windowStart, long windowEnd,
|
||||||
|
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
|
||||||
|
BigDecimal volume, int tradeCount) {
|
||||||
|
this.pool = pool;
|
||||||
|
this.token0 = token0;
|
||||||
|
this.token1 = token1;
|
||||||
|
this.windowStart = windowStart;
|
||||||
|
this.windowEnd = windowEnd;
|
||||||
|
this.open = open;
|
||||||
|
this.high = high;
|
||||||
|
this.low = low;
|
||||||
|
this.close = close;
|
||||||
|
this.volume = volume;
|
||||||
|
this.tradeCount = tradeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Getters
|
||||||
|
public String getPool() { return pool; }
|
||||||
|
public String getToken0() { return token0; }
|
||||||
|
public String getToken1() { return token1; }
|
||||||
|
public long getWindowStart() { return windowStart; }
|
||||||
|
public long getWindowEnd() { return windowEnd; }
|
||||||
|
public BigDecimal getOpen() { return open; }
|
||||||
|
public BigDecimal getHigh() { return high; }
|
||||||
|
public BigDecimal getLow() { return low; }
|
||||||
|
public BigDecimal getClose() { return close; }
|
||||||
|
public BigDecimal getVolume() { return volume; }
|
||||||
|
public int getTradeCount() { return tradeCount; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
|
||||||
|
pool.substring(0, 8) + "...",
|
||||||
|
token0.substring(0, 6), token1.substring(0, 6),
|
||||||
|
windowStart, windowEnd,
|
||||||
|
open, high, low, close, volume, tradeCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
41
src/main/java/stream/ohlc/OHLCAccumulator.java
Normal file
41
src/main/java/stream/ohlc/OHLCAccumulator.java
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package stream.ohlc;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
public class OHLCAccumulator {
|
||||||
|
public String pool = null;
|
||||||
|
public String token0 = null; // Added
|
||||||
|
public String token1 = null; // Added
|
||||||
|
public BigDecimal open = null;
|
||||||
|
public BigDecimal high = null;
|
||||||
|
public BigDecimal low = null;
|
||||||
|
public BigDecimal close = null;
|
||||||
|
public BigDecimal volume = BigDecimal.ZERO;
|
||||||
|
public int tradeCount = 0;
|
||||||
|
public long firstTradeTime = 0;
|
||||||
|
public long lastTradeTime = 0;
|
||||||
|
|
||||||
|
public OHLCAccumulator() {}
|
||||||
|
|
||||||
|
public void updatePrice(BigDecimal price, long timestamp) {
|
||||||
|
if (open == null) {
|
||||||
|
open = price;
|
||||||
|
high = price;
|
||||||
|
low = price;
|
||||||
|
firstTradeTime = timestamp;
|
||||||
|
} else {
|
||||||
|
high = high.max(price);
|
||||||
|
low = low.min(price);
|
||||||
|
}
|
||||||
|
close = price;
|
||||||
|
lastTradeTime = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addVolume(BigDecimal amount) {
|
||||||
|
volume = volume.add(amount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementTradeCount() {
|
||||||
|
tradeCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
98
src/main/java/stream/ohlc/OHLCAggregator.java
Normal file
98
src/main/java/stream/ohlc/OHLCAggregator.java
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package stream.ohlc;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||||
|
import stream.dto.Swap;
|
||||||
|
import stream.dto.OHLCCandle;
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
|
||||||
|
|
||||||
|
private final long windowSize = 60; // 1 minute in seconds
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCAccumulator createAccumulator() {
|
||||||
|
return new OHLCAccumulator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
|
||||||
|
// Initialize pool and tokens on first swap
|
||||||
|
if (accumulator.pool == null) {
|
||||||
|
accumulator.pool = swap.getPool();
|
||||||
|
// Store tokens in consistent order (you could sort by address)
|
||||||
|
accumulator.token0 = swap.getTakerAsset();
|
||||||
|
accumulator.token1 = swap.getMakerAsset();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update OHLC prices
|
||||||
|
accumulator.updatePrice(swap.getPrice(), swap.getTime());
|
||||||
|
|
||||||
|
// Calculate volume (using the taker's input amount as volume)
|
||||||
|
BigDecimal volume = swap.getAmountIn().abs();
|
||||||
|
accumulator.addVolume(volume);
|
||||||
|
|
||||||
|
// Increment trade count
|
||||||
|
accumulator.incrementTradeCount();
|
||||||
|
|
||||||
|
return accumulator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
|
||||||
|
OHLCAccumulator merged = new OHLCAccumulator();
|
||||||
|
|
||||||
|
// Merge pool and token info
|
||||||
|
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
|
||||||
|
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
|
||||||
|
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
|
||||||
|
|
||||||
|
// Merge OHLC data
|
||||||
|
if (acc1.open != null && acc2.open != null) {
|
||||||
|
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
|
||||||
|
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
|
||||||
|
merged.high = acc1.high.max(acc2.high);
|
||||||
|
merged.low = acc1.low.min(acc2.low);
|
||||||
|
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
|
||||||
|
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
|
||||||
|
} else if (acc1.open != null) {
|
||||||
|
merged.open = acc1.open;
|
||||||
|
merged.close = acc1.close;
|
||||||
|
merged.high = acc1.high;
|
||||||
|
merged.low = acc1.low;
|
||||||
|
merged.firstTradeTime = acc1.firstTradeTime;
|
||||||
|
merged.lastTradeTime = acc1.lastTradeTime;
|
||||||
|
} else if (acc2.open != null) {
|
||||||
|
merged.open = acc2.open;
|
||||||
|
merged.close = acc2.close;
|
||||||
|
merged.high = acc2.high;
|
||||||
|
merged.low = acc2.low;
|
||||||
|
merged.firstTradeTime = acc2.firstTradeTime;
|
||||||
|
merged.lastTradeTime = acc2.lastTradeTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
merged.volume = acc1.volume.add(acc2.volume);
|
||||||
|
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
|
||||||
|
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCCandle getResult(OHLCAccumulator accumulator) {
|
||||||
|
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
|
||||||
|
long windowEnd = windowStart + windowSize;
|
||||||
|
|
||||||
|
return new OHLCCandle(
|
||||||
|
accumulator.pool,
|
||||||
|
accumulator.token0,
|
||||||
|
accumulator.token1,
|
||||||
|
windowStart,
|
||||||
|
windowEnd,
|
||||||
|
accumulator.open,
|
||||||
|
accumulator.high,
|
||||||
|
accumulator.low,
|
||||||
|
accumulator.close,
|
||||||
|
accumulator.volume,
|
||||||
|
accumulator.tradeCount
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
26
src/main/java/stream/ohlc/OHLCPipeline.java
Normal file
26
src/main/java/stream/ohlc/OHLCPipeline.java
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package stream.ohlc;
|
||||||
|
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||||
|
import java.time.Duration;
|
||||||
|
import stream.dto.Swap;
|
||||||
|
import stream.dto.OHLCCandle;
|
||||||
|
|
||||||
|
public class OHLCPipeline {
|
||||||
|
|
||||||
|
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
|
||||||
|
|
||||||
|
return swapStream
|
||||||
|
// NO watermarks needed for processing-time windows
|
||||||
|
.keyBy(Swap::getPool)
|
||||||
|
|
||||||
|
// Use 1-minute processing-time windows
|
||||||
|
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
|
||||||
|
|
||||||
|
// Simply use the aggregator
|
||||||
|
.aggregate(new OHLCAggregator())
|
||||||
|
|
||||||
|
// Filter out empty windows
|
||||||
|
.filter(candle -> candle.getTradeCount() > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user