Compare commits
3 Commits
0d8df3df9c
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 255d8f126d | |||
| 88fce17efc | |||
| b4f1de9d0d |
@@ -27,12 +27,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
|
||||
import org.apache.flink.util.ParameterTool;
|
||||
import org.apache.flink.util.Collector;
|
||||
import stream.config.StreamingDefaults;
|
||||
import stream.dto.*;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import stream.io.PoolTokenIdElaborator;
|
||||
import stream.io.TokenElaborator;
|
||||
import stream.io.SwapElaborator;
|
||||
import stream.io.BlockTimestampElaborator;
|
||||
import stream.ohlc.OHLCPipeline;
|
||||
import stream.source.eventlog.EventLogSourceFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
@@ -72,6 +74,11 @@ public class DataStreamJob {
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
env.getConfig().setGlobalJobParameters(parameters);
|
||||
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
|
||||
|
||||
// Async operation parameters
|
||||
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
|
||||
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
|
||||
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
|
||||
|
||||
// Create ObjectMapper for pretty JSON printing
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
@@ -84,15 +91,17 @@ public class DataStreamJob {
|
||||
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
|
||||
swapStream,
|
||||
new BlockTimestampElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
asyncTimeoutSeconds,
|
||||
TimeUnit.SECONDS,
|
||||
asyncCapacity
|
||||
);
|
||||
|
||||
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
swapWithTimestampStream,
|
||||
new PoolTokenIdElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
asyncTimeoutSeconds,
|
||||
TimeUnit.SECONDS,
|
||||
asyncCapacity
|
||||
);
|
||||
|
||||
// Extract token addresses and elaborate with metadata
|
||||
@@ -109,8 +118,9 @@ public class DataStreamJob {
|
||||
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
|
||||
tokenAddresses,
|
||||
new TokenElaborator(),
|
||||
120,
|
||||
TimeUnit.SECONDS
|
||||
asyncTimeoutSeconds,
|
||||
TimeUnit.SECONDS,
|
||||
asyncCapacity
|
||||
);
|
||||
|
||||
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
|
||||
@@ -168,6 +178,33 @@ public class DataStreamJob {
|
||||
DataStream<Swap> swaps = swapsWithTokens
|
||||
.map(new SwapElaborator());
|
||||
|
||||
// Test OHLC with USDC/WETH pool
|
||||
OHLCPipeline ohlcPipeline = new OHLCPipeline();
|
||||
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
|
||||
|
||||
// Filter and print OHLC candles for USDC/WETH pool only
|
||||
allOhlc
|
||||
.filter(candle -> {
|
||||
// Filter for specific USDC/WETH pool
|
||||
String poolAddress = candle.getPool().toLowerCase();
|
||||
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
|
||||
return poolAddress.equals(targetPool);
|
||||
})
|
||||
.map(candle -> {
|
||||
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
|
||||
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
|
||||
" Trades=" + candle.getTradeCount() +
|
||||
" Open=" + candle.getOpen() +
|
||||
" High=" + candle.getHigh() +
|
||||
" Low=" + candle.getLow() +
|
||||
" Close=" + candle.getClose() +
|
||||
" Volume=" + candle.getVolume());
|
||||
return candle;
|
||||
})
|
||||
.setParallelism(1)
|
||||
.returns(OHLCCandle.class)
|
||||
.print("USDC-WETH-OHLC");
|
||||
|
||||
// Print the final enriched swap objects
|
||||
swaps
|
||||
.map(swap -> {
|
||||
|
||||
56
src/main/java/stream/dto/OHLCCandle.java
Normal file
56
src/main/java/stream/dto/OHLCCandle.java
Normal file
@@ -0,0 +1,56 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
public class OHLCCandle {
|
||||
private final String pool;
|
||||
private final String token0;
|
||||
private final String token1;
|
||||
private final long windowStart;
|
||||
private final long windowEnd;
|
||||
private final BigDecimal open;
|
||||
private final BigDecimal high;
|
||||
private final BigDecimal low;
|
||||
private final BigDecimal close;
|
||||
private final BigDecimal volume;
|
||||
private final int tradeCount;
|
||||
|
||||
public OHLCCandle(String pool, String token0, String token1,
|
||||
long windowStart, long windowEnd,
|
||||
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
|
||||
BigDecimal volume, int tradeCount) {
|
||||
this.pool = pool;
|
||||
this.token0 = token0;
|
||||
this.token1 = token1;
|
||||
this.windowStart = windowStart;
|
||||
this.windowEnd = windowEnd;
|
||||
this.open = open;
|
||||
this.high = high;
|
||||
this.low = low;
|
||||
this.close = close;
|
||||
this.volume = volume;
|
||||
this.tradeCount = tradeCount;
|
||||
}
|
||||
|
||||
// Getters
|
||||
public String getPool() { return pool; }
|
||||
public String getToken0() { return token0; }
|
||||
public String getToken1() { return token1; }
|
||||
public long getWindowStart() { return windowStart; }
|
||||
public long getWindowEnd() { return windowEnd; }
|
||||
public BigDecimal getOpen() { return open; }
|
||||
public BigDecimal getHigh() { return high; }
|
||||
public BigDecimal getLow() { return low; }
|
||||
public BigDecimal getClose() { return close; }
|
||||
public BigDecimal getVolume() { return volume; }
|
||||
public int getTradeCount() { return tradeCount; }
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
|
||||
pool.substring(0, 8) + "...",
|
||||
token0.substring(0, 6), token1.substring(0, 6),
|
||||
windowStart, windowEnd,
|
||||
open, high, low, close, volume, tradeCount);
|
||||
}
|
||||
}
|
||||
@@ -16,11 +16,14 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
|
||||
private transient Web3j w3;
|
||||
private static final AtomicInteger activeOperations = new AtomicInteger(0);
|
||||
private static final AtomicInteger totalOperations = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
@@ -30,6 +33,13 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
|
||||
int active = activeOperations.incrementAndGet();
|
||||
int total = totalOperations.incrementAndGet();
|
||||
|
||||
if (total % 100 == 0) {
|
||||
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
|
||||
}
|
||||
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return getBlock(blockId);
|
||||
@@ -37,7 +47,14 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
log.error("Failed to get block {} on chain {}", blockId, e);
|
||||
throw new RuntimeException("Error processing block " + blockId, e);
|
||||
}
|
||||
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
|
||||
}).thenAccept(result -> {
|
||||
activeOperations.decrementAndGet();
|
||||
resultFuture.complete(Collections.singleton(result));
|
||||
}).exceptionally(throwable -> {
|
||||
activeOperations.decrementAndGet();
|
||||
resultFuture.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private EthBlock getBlock(BlockId id) {
|
||||
|
||||
@@ -45,10 +45,9 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
||||
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||
}
|
||||
|
||||
|
||||
// Determine which token is in and which is out based on amount signs
|
||||
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||
|
||||
String takerAsset;
|
||||
String makerAsset;
|
||||
BigDecimal amountIn;
|
||||
@@ -59,7 +58,6 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
||||
// User is sending token0, receiving token1
|
||||
takerAsset = event.getToken0Address();
|
||||
makerAsset = event.getToken1Address();
|
||||
|
||||
// Convert amounts to human-readable format using decimals
|
||||
amountIn = new BigDecimal(swapLog.amount0.abs())
|
||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||
@@ -79,15 +77,12 @@ public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata,
|
||||
amountOut = new BigDecimal(swapLog.amount0)
|
||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||
|
||||
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
|
||||
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
|
||||
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
|
||||
// Currently adjustedPrice = token1/token0, so we keep it as is
|
||||
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
|
||||
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
|
||||
finalPrice = adjustedPrice;
|
||||
}
|
||||
|
||||
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
|
||||
pool, token0.symbol + "/" + token1.symbol,
|
||||
isToken0In ? token1.symbol : token0.symbol,
|
||||
amountIn, amountOut, finalPrice);
|
||||
|
||||
// Pass both amountIn and amountOut to constructor with unit price
|
||||
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.http.HttpService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import stream.config.StreamingDefaults;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
@@ -25,15 +26,19 @@ public class Web3Client {
|
||||
synchronized (w3Lock) {
|
||||
log.info("Initializing Web3 client");
|
||||
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
|
||||
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
|
||||
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
|
||||
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
|
||||
Duration timeout = Duration.ofSeconds(30);
|
||||
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||
|
||||
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
|
||||
|
||||
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||
url, maxIdleConnections, keepAlive, timeout.getSeconds());
|
||||
|
||||
var httpClient = new OkHttpClient.Builder()
|
||||
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
|
||||
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
|
||||
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
|
||||
.connectionPool(connectionPool)
|
||||
.build();
|
||||
w3 = Web3j.build(new HttpService(url, httpClient));
|
||||
log.info("Web3 client initialized successfully");
|
||||
|
||||
41
src/main/java/stream/ohlc/OHLCAccumulator.java
Normal file
41
src/main/java/stream/ohlc/OHLCAccumulator.java
Normal file
@@ -0,0 +1,41 @@
|
||||
package stream.ohlc;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
public class OHLCAccumulator {
|
||||
public String pool = null;
|
||||
public String token0 = null; // Added
|
||||
public String token1 = null; // Added
|
||||
public BigDecimal open = null;
|
||||
public BigDecimal high = null;
|
||||
public BigDecimal low = null;
|
||||
public BigDecimal close = null;
|
||||
public BigDecimal volume = BigDecimal.ZERO;
|
||||
public int tradeCount = 0;
|
||||
public long firstTradeTime = 0;
|
||||
public long lastTradeTime = 0;
|
||||
|
||||
public OHLCAccumulator() {}
|
||||
|
||||
public void updatePrice(BigDecimal price, long timestamp) {
|
||||
if (open == null) {
|
||||
open = price;
|
||||
high = price;
|
||||
low = price;
|
||||
firstTradeTime = timestamp;
|
||||
} else {
|
||||
high = high.max(price);
|
||||
low = low.min(price);
|
||||
}
|
||||
close = price;
|
||||
lastTradeTime = timestamp;
|
||||
}
|
||||
|
||||
public void addVolume(BigDecimal amount) {
|
||||
volume = volume.add(amount);
|
||||
}
|
||||
|
||||
public void incrementTradeCount() {
|
||||
tradeCount++;
|
||||
}
|
||||
}
|
||||
98
src/main/java/stream/ohlc/OHLCAggregator.java
Normal file
98
src/main/java/stream/ohlc/OHLCAggregator.java
Normal file
@@ -0,0 +1,98 @@
|
||||
package stream.ohlc;
|
||||
|
||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||
import stream.dto.Swap;
|
||||
import stream.dto.OHLCCandle;
|
||||
import java.math.BigDecimal;
|
||||
|
||||
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
|
||||
|
||||
private final long windowSize = 60; // 1 minute in seconds
|
||||
|
||||
@Override
|
||||
public OHLCAccumulator createAccumulator() {
|
||||
return new OHLCAccumulator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
|
||||
// Initialize pool and tokens on first swap
|
||||
if (accumulator.pool == null) {
|
||||
accumulator.pool = swap.getPool();
|
||||
// Store tokens in consistent order (you could sort by address)
|
||||
accumulator.token0 = swap.getTakerAsset();
|
||||
accumulator.token1 = swap.getMakerAsset();
|
||||
}
|
||||
|
||||
// Update OHLC prices
|
||||
accumulator.updatePrice(swap.getPrice(), swap.getTime());
|
||||
|
||||
// Calculate volume (using the taker's input amount as volume)
|
||||
BigDecimal volume = swap.getAmountIn().abs();
|
||||
accumulator.addVolume(volume);
|
||||
|
||||
// Increment trade count
|
||||
accumulator.incrementTradeCount();
|
||||
|
||||
return accumulator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
|
||||
OHLCAccumulator merged = new OHLCAccumulator();
|
||||
|
||||
// Merge pool and token info
|
||||
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
|
||||
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
|
||||
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
|
||||
|
||||
// Merge OHLC data
|
||||
if (acc1.open != null && acc2.open != null) {
|
||||
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
|
||||
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
|
||||
merged.high = acc1.high.max(acc2.high);
|
||||
merged.low = acc1.low.min(acc2.low);
|
||||
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
|
||||
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
|
||||
} else if (acc1.open != null) {
|
||||
merged.open = acc1.open;
|
||||
merged.close = acc1.close;
|
||||
merged.high = acc1.high;
|
||||
merged.low = acc1.low;
|
||||
merged.firstTradeTime = acc1.firstTradeTime;
|
||||
merged.lastTradeTime = acc1.lastTradeTime;
|
||||
} else if (acc2.open != null) {
|
||||
merged.open = acc2.open;
|
||||
merged.close = acc2.close;
|
||||
merged.high = acc2.high;
|
||||
merged.low = acc2.low;
|
||||
merged.firstTradeTime = acc2.firstTradeTime;
|
||||
merged.lastTradeTime = acc2.lastTradeTime;
|
||||
}
|
||||
|
||||
merged.volume = acc1.volume.add(acc2.volume);
|
||||
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OHLCCandle getResult(OHLCAccumulator accumulator) {
|
||||
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
|
||||
long windowEnd = windowStart + windowSize;
|
||||
|
||||
return new OHLCCandle(
|
||||
accumulator.pool,
|
||||
accumulator.token0,
|
||||
accumulator.token1,
|
||||
windowStart,
|
||||
windowEnd,
|
||||
accumulator.open,
|
||||
accumulator.high,
|
||||
accumulator.low,
|
||||
accumulator.close,
|
||||
accumulator.volume,
|
||||
accumulator.tradeCount
|
||||
);
|
||||
}
|
||||
}
|
||||
26
src/main/java/stream/ohlc/OHLCPipeline.java
Normal file
26
src/main/java/stream/ohlc/OHLCPipeline.java
Normal file
@@ -0,0 +1,26 @@
|
||||
package stream.ohlc;
|
||||
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||
import java.time.Duration;
|
||||
import stream.dto.Swap;
|
||||
import stream.dto.OHLCCandle;
|
||||
|
||||
public class OHLCPipeline {
|
||||
|
||||
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
|
||||
|
||||
return swapStream
|
||||
// NO watermarks needed for processing-time windows
|
||||
.keyBy(Swap::getPool)
|
||||
|
||||
// Use 1-minute processing-time windows
|
||||
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
|
||||
|
||||
// Simply use the aggregator
|
||||
.aggregate(new OHLCAggregator())
|
||||
|
||||
// Filter out empty windows
|
||||
.filter(candle -> candle.getTradeCount() > 0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user