Compare commits

5 Commits

13 changed files with 483 additions and 106 deletions

View File

@@ -27,13 +27,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.ParameterTool; import org.apache.flink.util.ParameterTool;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import stream.config.StreamingDefaults;
import stream.dto.*; import stream.dto.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import stream.io.PoolElaborator; import stream.io.PoolTokenIdElaborator;
import stream.io.TokenElaborator; import stream.io.TokenElaborator;
import stream.io.SwapEnricher; import stream.io.SwapElaborator;
import stream.io.BlockTimestampElaborator;
import stream.ohlc.OHLCPipeline;
import stream.source.eventlog.EventLogSourceFactory; import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@@ -71,53 +73,42 @@ public class DataStreamJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters); env.getConfig().setGlobalJobParameters(parameters);
// do not do this until considering how secrets are handled by flink
// env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546")); URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
// Async operation parameters
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
// Create ObjectMapper for pretty JSON printing // Create ObjectMapper for pretty JSON printing
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"ArbitrumOne Head Blocks",
TypeInformation.of(ArbitrumOneBlock.class)
);
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Log raw swap events // Add block timestamp to swap events
swapStream.map(swapLog -> { SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
log.info("RAW SWAP EVENT - Pool: {}, Sender: {}, Recipient: {}, Amount0: {}, Amount1: {}, SqrtPriceX96: {}, Liquidity: {}, Tick: {}, TxHash: {}",
swapLog.address, swapLog.sender, swapLog.recipient,
swapLog.amount0, swapLog.amount1, swapLog.sqrtPriceX96,
swapLog.liquidity, swapLog.tick, swapLog.transactionHash);
return swapLog;
});
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
swapStream, swapStream,
new PoolElaborator(), new BlockTimestampElaborator(),
30, asyncTimeoutSeconds,
TimeUnit.SECONDS TimeUnit.SECONDS,
asyncCapacity
);
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
swapWithTimestampStream,
new PoolTokenIdElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
); );
// Extract token addresses and elaborate with metadata // Extract token addresses and elaborate with metadata
DataStream<AddressId> tokenAddresses = elaboratedSwapStream DataStream<AddressId> tokenAddresses = elaboratedSwapStream
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() { .flatMap(new FlatMapFunction<SwapEventWithTokenIds, AddressId>() {
@Override @Override
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception { public void flatMap(SwapEventWithTokenIds event, Collector<AddressId> collector) throws Exception {
collector.collect(new AddressId(42161, event.getToken0Address())); collector.collect(new AddressId(42161, event.getToken0Address()));
collector.collect(new AddressId(42161, event.getToken1Address())); collector.collect(new AddressId(42161, event.getToken1Address()));
} }
@@ -127,19 +118,20 @@ public class DataStreamJob {
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait( SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
tokenAddresses, tokenAddresses,
new TokenElaborator(), new TokenElaborator(),
120, asyncTimeoutSeconds,
TimeUnit.SECONDS TimeUnit.SECONDS,
asyncCapacity
); );
// Connect swap events with token metadata to create FullyElaboratedSwap // Connect swap events with token metadata to create SwapEventWithTokenMetadata
DataStream<FullyElaboratedSwap> fullyElaboratedSwaps = elaboratedSwapStream DataStream<SwapEventWithTokenMetadata> swapsWithTokens = elaboratedSwapStream
.connect(elaboratedTokens) .connect(elaboratedTokens)
.flatMap(new RichCoFlatMapFunction<ElaboratedSwapEvent, Token, FullyElaboratedSwap>() { .flatMap(new RichCoFlatMapFunction<SwapEventWithTokenIds, Token, SwapEventWithTokenMetadata>() {
private final Map<String, Token> tokenCache = new HashMap<>(); private final Map<String, Token> tokenCache = new HashMap<>();
private final Map<String, ElaboratedSwapEvent> pendingSwaps = new HashMap<>(); private final Map<String, SwapEventWithTokenIds> pendingSwaps = new HashMap<>();
@Override @Override
public void flatMap1(ElaboratedSwapEvent event, Collector<FullyElaboratedSwap> out) throws Exception { public void flatMap1(SwapEventWithTokenIds event, Collector<SwapEventWithTokenMetadata> out) throws Exception {
String token0Addr = event.getToken0Address().toLowerCase(); String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase(); String token1Addr = event.getToken1Address().toLowerCase();
@@ -147,10 +139,9 @@ public class DataStreamJob {
Token token1 = tokenCache.get(token1Addr); Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) { if (token0 != null && token1 != null) {
// We have both tokens, create FullyElaboratedSwap // We have both tokens, create SwapEventWithTokenMetadata
SwapEventLog swapLog = event.getSwapEvent(); SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1); out.collect(swapWithMetadata);
out.collect(fullSwap);
} else { } else {
// Cache the swap event until we get both tokens // Cache the swap event until we get both tokens
pendingSwaps.put(event.getSwapEvent().transactionHash, event); pendingSwaps.put(event.getSwapEvent().transactionHash, event);
@@ -158,38 +149,64 @@ public class DataStreamJob {
} }
@Override @Override
public void flatMap2(Token token, Collector<FullyElaboratedSwap> out) throws Exception { public void flatMap2(Token token, Collector<SwapEventWithTokenMetadata> out) throws Exception {
// Cache the token // Cache the token
tokenCache.put(token.address.toLowerCase(), token); tokenCache.put(token.address.toLowerCase(), token);
// Check if any pending swaps can now be completed // Check if any pending swaps can now be completed
Iterator<Map.Entry<String, ElaboratedSwapEvent>> iterator = pendingSwaps.entrySet().iterator(); Iterator<Map.Entry<String, SwapEventWithTokenIds>> iterator = pendingSwaps.entrySet().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Map.Entry<String, ElaboratedSwapEvent> entry = iterator.next(); Map.Entry<String, SwapEventWithTokenIds> entry = iterator.next();
ElaboratedSwapEvent event = entry.getValue(); SwapEventWithTokenIds event = entry.getValue();
String token0Addr = event.getToken0Address().toLowerCase(); String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase(); String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr); Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr); Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) { if (token0 != null && token1 != null) {
SwapEventLog swapLog = event.getSwapEvent(); SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1); out.collect(swapWithMetadata);
out.collect(fullSwap);
iterator.remove(); iterator.remove();
} }
} }
} }
}); });
// Apply SwapEnricher to create final Swap objects with calculated prices // Apply SwapElaborator to create final Swap objects with calculated prices
DataStream<Swap> enrichedSwaps = fullyElaboratedSwaps DataStream<Swap> swaps = swapsWithTokens
.map(new SwapEnricher()); .map(new SwapElaborator());
// Test OHLC with USDC/WETH pool
OHLCPipeline ohlcPipeline = new OHLCPipeline();
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
// Filter and print OHLC candles for USDC/WETH pool only
allOhlc
.filter(candle -> {
// Filter for specific USDC/WETH pool
String poolAddress = candle.getPool().toLowerCase();
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
return poolAddress.equals(targetPool);
})
.map(candle -> {
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
" Trades=" + candle.getTradeCount() +
" Open=" + candle.getOpen() +
" High=" + candle.getHigh() +
" Low=" + candle.getLow() +
" Close=" + candle.getClose() +
" Volume=" + candle.getVolume());
return candle;
})
.setParallelism(1)
.returns(OHLCCandle.class)
.print("USDC-WETH-OHLC");
// Print the final enriched swap objects // Print the final enriched swap objects
enrichedSwaps swaps
.map(swap -> { .map(swap -> {
try { try {
String json = mapper.writeValueAsString(swap); String json = mapper.writeValueAsString(swap);

View File

@@ -0,0 +1,56 @@
package stream.dto;
import java.math.BigDecimal;
public class OHLCCandle {
private final String pool;
private final String token0;
private final String token1;
private final long windowStart;
private final long windowEnd;
private final BigDecimal open;
private final BigDecimal high;
private final BigDecimal low;
private final BigDecimal close;
private final BigDecimal volume;
private final int tradeCount;
public OHLCCandle(String pool, String token0, String token1,
long windowStart, long windowEnd,
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
BigDecimal volume, int tradeCount) {
this.pool = pool;
this.token0 = token0;
this.token1 = token1;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.open = open;
this.high = high;
this.low = low;
this.close = close;
this.volume = volume;
this.tradeCount = tradeCount;
}
// Getters
public String getPool() { return pool; }
public String getToken0() { return token0; }
public String getToken1() { return token1; }
public long getWindowStart() { return windowStart; }
public long getWindowEnd() { return windowEnd; }
public BigDecimal getOpen() { return open; }
public BigDecimal getHigh() { return high; }
public BigDecimal getLow() { return low; }
public BigDecimal getClose() { return close; }
public BigDecimal getVolume() { return volume; }
public int getTradeCount() { return tradeCount; }
@Override
public String toString() {
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
pool.substring(0, 8) + "...",
token0.substring(0, 6), token1.substring(0, 6),
windowStart, windowEnd,
open, high, low, close, volume, tradeCount);
}
}

View File

@@ -1,21 +1,30 @@
package stream.dto; package stream.dto;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigInteger;
public class ElaboratedSwapEvent implements Serializable { public class SwapEventWithTokenIds implements Serializable {
private SwapEventLog swapEvent; private SwapEventLog swapEvent;
private String token0Address; private String token0Address;
private String token1Address; private String token1Address;
private BigInteger timestamp;
public ElaboratedSwapEvent() { public SwapEventWithTokenIds() {
} }
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) { public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) {
this.swapEvent = swapEvent; this.swapEvent = swapEvent;
this.token0Address = token0Address; this.token0Address = token0Address;
this.token1Address = token1Address; this.token1Address = token1Address;
} }
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() { public SwapEventLog getSwapEvent() {
return swapEvent; return swapEvent;
} }
@@ -39,4 +48,12 @@ public class ElaboratedSwapEvent implements Serializable {
public void setToken1Address(String token1Address) { public void setToken1Address(String token1Address) {
this.token1Address = token1Address; this.token1Address = token1Address;
} }
public BigInteger getTimestamp() {
return timestamp;
}
public void setTimestamp(BigInteger timestamp) {
this.timestamp = timestamp;
}
} }

View File

@@ -2,25 +2,25 @@ package stream.dto;
import java.io.Serializable; import java.io.Serializable;
public class FullyElaboratedSwap implements Serializable { public class SwapEventWithTokenMetadata implements Serializable {
private ElaboratedSwapEvent swapEvent; private SwapEventWithTokenIds swapEvent;
private Token token0; private Token token0;
private Token token1; private Token token1;
public FullyElaboratedSwap() { public SwapEventWithTokenMetadata() {
} }
public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) { public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) {
this.swapEvent = swapEvent; this.swapEvent = swapEvent;
this.token0 = token0; this.token0 = token0;
this.token1 = token1; this.token1 = token1;
} }
public ElaboratedSwapEvent getSwapEvent() { public SwapEventWithTokenIds getSwapEvent() {
return swapEvent; return swapEvent;
} }
public void setSwapEvent(ElaboratedSwapEvent swapEvent) { public void setSwapEvent(SwapEventWithTokenIds swapEvent) {
this.swapEvent = swapEvent; this.swapEvent = swapEvent;
} }

View File

@@ -0,0 +1,24 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapWithTimestamp implements Serializable {
private static final long serialVersionUID = 1L;
private final SwapEventLog swapEvent;
private final BigInteger timestamp;
public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public BigInteger getTimestamp() {
return timestamp;
}
}

View File

@@ -16,11 +16,14 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> { public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class); private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
private transient Web3j w3; private transient Web3j w3;
private static final AtomicInteger activeOperations = new AtomicInteger(0);
private static final AtomicInteger totalOperations = new AtomicInteger(0);
@Override @Override
public void open(OpenContext openContext) throws Exception { public void open(OpenContext openContext) throws Exception {
@@ -30,6 +33,13 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
@Override @Override
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) { public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
int active = activeOperations.incrementAndGet();
int total = totalOperations.incrementAndGet();
if (total % 100 == 0) {
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
}
CompletableFuture.supplyAsync(() -> { CompletableFuture.supplyAsync(() -> {
try { try {
return getBlock(blockId); return getBlock(blockId);
@@ -37,7 +47,14 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
log.error("Failed to get block {} on chain {}", blockId, e); log.error("Failed to get block {} on chain {}", blockId, e);
throw new RuntimeException("Error processing block " + blockId, e); throw new RuntimeException("Error processing block " + blockId, e);
} }
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result))); }).thenAccept(result -> {
activeOperations.decrementAndGet();
resultFuture.complete(Collections.singleton(result));
}).exceptionally(throwable -> {
activeOperations.decrementAndGet();
resultFuture.completeExceptionally(throwable);
return null;
});
} }
private EthBlock getBlock(BlockId id) { private EthBlock getBlock(BlockId id) {

View File

@@ -0,0 +1,80 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.SwapEventLog;
import stream.dto.SwapWithTimestamp;
import java.math.BigInteger;
import java.util.Collections;
public class BlockTimestampElaborator extends RichAsyncFunction<SwapEventLog, SwapWithTimestamp> {
private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class);
private transient BlockElaborator blockElaborator;
@Override
public void open(OpenContext openContext) throws Exception {
blockElaborator = new BlockElaborator();
blockElaborator.setRuntimeContext(getRuntimeContext());
blockElaborator.open(openContext);
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<SwapWithTimestamp> resultFuture) {
try {
BlockHash blockHash = new BlockHash();
blockHash.hash = swapEvent.blockHash;
// Create a custom ResultFuture to handle the block elaboration result
ResultFuture<EthBlock> blockResultFuture = new ResultFuture<EthBlock>() {
@Override
public void complete(java.util.Collection<EthBlock> result) {
try {
if (!result.isEmpty()) {
EthBlock ethBlock = result.iterator().next();
BigInteger timestamp = ethBlock.getBlock().getTimestamp();
SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp);
resultFuture.complete(Collections.singleton(swapWithTimestamp));
} else {
log.error("No block found for block hash {}", swapEvent.blockHash);
resultFuture.completeExceptionally(new RuntimeException("No block found"));
}
} catch (Exception e) {
log.error("Failed to get timestamp for block {} of swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, e);
resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e));
}
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<EthBlock> collectionSupplier) {
try {
complete(collectionSupplier.get());
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
log.error("Failed to get block {} for swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, error);
resultFuture.completeExceptionally(error);
}
};
// Invoke the block elaborator asynchronously
blockElaborator.asyncInvoke(blockHash, blockResultFuture);
} catch (Exception e) {
log.error("Error in BlockTimestampElaborator for swap in tx {}",
swapEvent.transactionHash, e);
resultFuture.completeExceptionally(e);
}
}
}

View File

@@ -1,7 +1,7 @@
package stream.io; package stream.io;
import stream.dto.SwapEventLog; import stream.dto.SwapWithTimestamp;
import stream.dto.ElaboratedSwapEvent; import stream.dto.SwapEventWithTokenIds;
import stream.contract.UniswapV3Pool; import stream.contract.UniswapV3Pool;
import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.OpenContext;
@@ -17,8 +17,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Map; import java.util.Map;
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> { public class PoolTokenIdElaborator extends RichAsyncFunction<SwapWithTimestamp, SwapEventWithTokenIds> {
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class); private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class);
private transient Web3j web3j; private transient Web3j web3j;
private transient Credentials credentials; private transient Credentials credentials;
private transient DefaultGasProvider gasProvider; private transient DefaultGasProvider gasProvider;
@@ -43,12 +43,12 @@ public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSw
} }
@Override @Override
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception { public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture<SwapEventWithTokenIds> resultFuture) throws Exception {
CompletableFuture.supplyAsync(() -> { CompletableFuture.supplyAsync(() -> {
try { try {
// Load the pool contract // Load the pool contract
UniswapV3Pool pool = UniswapV3Pool.load( UniswapV3Pool pool = UniswapV3Pool.load(
swap.getAddress(), // Pool address from the event swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event
web3j, web3j,
credentials, credentials,
gasProvider gasProvider
@@ -58,13 +58,14 @@ public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSw
String token0 = pool.token0().send(); String token0 = pool.token0().send();
String token1 = pool.token1().send(); String token1 = pool.token1().send();
// Create enriched event // Create enriched event with timestamp
return new ElaboratedSwapEvent(swap, token0, token1); return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp());
} catch (Exception e) { } catch (Exception e) {
log.error("Error fetching pool tokens", e); log.error("Error fetching pool tokens for swap in pool {}",
// Return original without enrichment swapWithTimestamp.getSwapEvent().getAddress(), e);
return new ElaboratedSwapEvent(swap, null, null); // Return original without enrichment but with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp());
} }
}).thenAccept(enriched -> { }).thenAccept(enriched -> {
resultFuture.complete(Collections.singletonList(enriched)); resultFuture.complete(Collections.singletonList(enriched));
@@ -77,4 +78,4 @@ public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSw
web3j.shutdown(); web3j.shutdown();
} }
} }
} }

View File

@@ -11,20 +11,20 @@ import java.math.BigInteger;
import java.math.MathContext; import java.math.MathContext;
import java.math.RoundingMode; import java.math.RoundingMode;
public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> { public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata, Swap> {
private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class); private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class);
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP); private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
private static final BigDecimal Q96 = new BigDecimal(2).pow(96); private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
@Override @Override
public Swap map(FullyElaboratedSwap fullSwap) throws Exception { public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception {
ElaboratedSwapEvent event = fullSwap.getSwapEvent(); SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent();
SwapEventLog swapLog = event.getSwapEvent(); SwapEventLog swapLog = event.getSwapEvent();
Token token0 = fullSwap.getToken0(); Token token0 = swapWithMetadata.getToken0();
Token token1 = fullSwap.getToken1(); Token token1 = swapWithMetadata.getToken1();
// For now, hardcode exchange as UNISWAP_V3 and time as current time // Use timestamp from block elaboration and set exchange as UNISWAP_V3
Long time = System.currentTimeMillis(); Long time = event.getTimestamp().longValue();
Exchange exchange = Exchange.UNISWAP_V3; Exchange exchange = Exchange.UNISWAP_V3;
// Pool address // Pool address
@@ -45,10 +45,9 @@ public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff); BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT); adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
} }
// Determine which token is in and which is out based on amount signs // Determine which token is in and which is out based on amount signs
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0; boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
String takerAsset; String takerAsset;
String makerAsset; String makerAsset;
BigDecimal amountIn; BigDecimal amountIn;
@@ -59,7 +58,6 @@ public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
// User is sending token0, receiving token1 // User is sending token0, receiving token1
takerAsset = event.getToken0Address(); takerAsset = event.getToken0Address();
makerAsset = event.getToken1Address(); makerAsset = event.getToken1Address();
// Convert amounts to human-readable format using decimals // Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount0.abs()) amountIn = new BigDecimal(swapLog.amount0.abs())
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
@@ -79,15 +77,12 @@ public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
amountOut = new BigDecimal(swapLog.amount0) amountOut = new BigDecimal(swapLog.amount0)
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT); .divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
// Price is how much token0 you get per token1 (inverse of adjustedPrice) // We need to calculate price as token1/token0 to be consistent with token0 as quote currency
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT); // Currently adjustedPrice = token1/token0, so we keep it as is
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
finalPrice = adjustedPrice;
} }
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
pool, token0.symbol + "/" + token1.symbol,
isToken0In ? token1.symbol : token0.symbol,
amountIn, amountOut, finalPrice);
// Pass both amountIn and amountOut to constructor with unit price // Pass both amountIn and amountOut to constructor with unit price
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice); return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
} }

View File

@@ -6,6 +6,7 @@ import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService; import org.web3j.protocol.http.HttpService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import stream.config.StreamingDefaults;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
@@ -25,15 +26,19 @@ public class Web3Client {
synchronized (w3Lock) { synchronized (w3Lock) {
log.info("Initializing Web3 client"); log.info("Initializing Web3 client");
String url = params.getOrDefault("rpc_url", "http://localhost:8545"); String url = params.getOrDefault("rpc_url", "http://localhost:8545");
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5")); int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60")); int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
Duration timeout = Duration.ofSeconds(30); Duration timeout = Duration.ofSeconds(30);
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
url, maxIdleConnections, keepAlive, timeout.getSeconds()); url, maxIdleConnections, keepAlive, timeout.getSeconds());
var httpClient = new OkHttpClient.Builder() var httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT)) .connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout) .connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES)) .connectionPool(connectionPool)
.build(); .build();
w3 = Web3j.build(new HttpService(url, httpClient)); w3 = Web3j.build(new HttpService(url, httpClient));
log.info("Web3 client initialized successfully"); log.info("Web3 client initialized successfully");

View File

@@ -0,0 +1,41 @@
package stream.ohlc;
import java.math.BigDecimal;
public class OHLCAccumulator {
public String pool = null;
public String token0 = null; // Added
public String token1 = null; // Added
public BigDecimal open = null;
public BigDecimal high = null;
public BigDecimal low = null;
public BigDecimal close = null;
public BigDecimal volume = BigDecimal.ZERO;
public int tradeCount = 0;
public long firstTradeTime = 0;
public long lastTradeTime = 0;
public OHLCAccumulator() {}
public void updatePrice(BigDecimal price, long timestamp) {
if (open == null) {
open = price;
high = price;
low = price;
firstTradeTime = timestamp;
} else {
high = high.max(price);
low = low.min(price);
}
close = price;
lastTradeTime = timestamp;
}
public void addVolume(BigDecimal amount) {
volume = volume.add(amount);
}
public void incrementTradeCount() {
tradeCount++;
}
}

View File

@@ -0,0 +1,98 @@
package stream.ohlc;
import org.apache.flink.api.common.functions.AggregateFunction;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
import java.math.BigDecimal;
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
private final long windowSize = 60; // 1 minute in seconds
@Override
public OHLCAccumulator createAccumulator() {
return new OHLCAccumulator();
}
@Override
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
// Initialize pool and tokens on first swap
if (accumulator.pool == null) {
accumulator.pool = swap.getPool();
// Store tokens in consistent order (you could sort by address)
accumulator.token0 = swap.getTakerAsset();
accumulator.token1 = swap.getMakerAsset();
}
// Update OHLC prices
accumulator.updatePrice(swap.getPrice(), swap.getTime());
// Calculate volume (using the taker's input amount as volume)
BigDecimal volume = swap.getAmountIn().abs();
accumulator.addVolume(volume);
// Increment trade count
accumulator.incrementTradeCount();
return accumulator;
}
@Override
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
OHLCAccumulator merged = new OHLCAccumulator();
// Merge pool and token info
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
// Merge OHLC data
if (acc1.open != null && acc2.open != null) {
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
merged.high = acc1.high.max(acc2.high);
merged.low = acc1.low.min(acc2.low);
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
} else if (acc1.open != null) {
merged.open = acc1.open;
merged.close = acc1.close;
merged.high = acc1.high;
merged.low = acc1.low;
merged.firstTradeTime = acc1.firstTradeTime;
merged.lastTradeTime = acc1.lastTradeTime;
} else if (acc2.open != null) {
merged.open = acc2.open;
merged.close = acc2.close;
merged.high = acc2.high;
merged.low = acc2.low;
merged.firstTradeTime = acc2.firstTradeTime;
merged.lastTradeTime = acc2.lastTradeTime;
}
merged.volume = acc1.volume.add(acc2.volume);
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
return merged;
}
@Override
public OHLCCandle getResult(OHLCAccumulator accumulator) {
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
long windowEnd = windowStart + windowSize;
return new OHLCCandle(
accumulator.pool,
accumulator.token0,
accumulator.token1,
windowStart,
windowEnd,
accumulator.open,
accumulator.high,
accumulator.low,
accumulator.close,
accumulator.volume,
accumulator.tradeCount
);
}
}

View File

@@ -0,0 +1,26 @@
package stream.ohlc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import java.time.Duration;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
public class OHLCPipeline {
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
return swapStream
// NO watermarks needed for processing-time windows
.keyBy(Swap::getPool)
// Use 1-minute processing-time windows
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
// Simply use the aggregator
.aggregate(new OHLCAggregator())
// Filter out empty windows
.filter(candle -> candle.getTradeCount() > 0);
}
}