diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index 9068298..cacf582 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -33,6 +33,7 @@ import stream.io.PoolTokenIdElaborator; import stream.io.TokenElaborator; import stream.io.SwapElaborator; import stream.io.BlockTimestampElaborator; +import stream.ohlc.OHLCPipeline; import stream.source.eventlog.EventLogSourceFactory; import java.util.HashMap; @@ -84,7 +85,7 @@ public class DataStreamJob { SingleOutputStreamOperator swapWithTimestampStream = AsyncDataStream.unorderedWait( swapStream, new BlockTimestampElaborator(), - 30, + 120, TimeUnit.SECONDS ); @@ -168,6 +169,33 @@ public class DataStreamJob { DataStream swaps = swapsWithTokens .map(new SwapElaborator()); + // Test OHLC with USDC/WETH pool + OHLCPipeline ohlcPipeline = new OHLCPipeline(); + DataStream allOhlc = ohlcPipeline.createOHLCStream(swaps); + + // Filter and print OHLC candles for USDC/WETH pool only + allOhlc + .filter(candle -> { + // Filter for specific USDC/WETH pool + String poolAddress = candle.getPool().toLowerCase(); + String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase(); + return poolAddress.equals(targetPool); + }) + .map(candle -> { + System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() + + " Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() + + " Trades=" + candle.getTradeCount() + + " Open=" + candle.getOpen() + + " High=" + candle.getHigh() + + " Low=" + candle.getLow() + + " Close=" + candle.getClose() + + " Volume=" + candle.getVolume()); + return candle; + }) + .setParallelism(1) + .returns(OHLCCandle.class) + .print("USDC-WETH-OHLC"); + // Print the final enriched swap objects swaps .map(swap -> { diff --git a/src/main/java/stream/io/SwapElaborator.java b/src/main/java/stream/io/SwapElaborator.java index f7c67e7..77ca42f 100644 --- a/src/main/java/stream/io/SwapElaborator.java +++ b/src/main/java/stream/io/SwapElaborator.java @@ -45,10 +45,14 @@ public class SwapElaborator extends RichMapFunction {} Amount: {} -> {} Price: {}", - pool, token0.symbol + "/" + token1.symbol, - isToken0In ? token1.symbol : token0.symbol, - amountIn, amountOut, finalPrice); - // Pass both amountIn and amountOut to constructor with unit price return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice); }