Compare commits
7 Commits
04d441d6b1
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 255d8f126d | |||
| 88fce17efc | |||
| b4f1de9d0d | |||
| 0d8df3df9c | |||
| 2fc85c688b | |||
| 47fe85b50b | |||
|
|
6b4cad1479 |
@@ -1,4 +0,0 @@
|
|||||||
--add-opens java.base/java.lang=ALL-UNNAMED
|
|
||||||
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
|
|
||||||
--add-opens java.base/java.net=ALL-UNNAMED
|
|
||||||
--add-opens java.base/java.util=ALL-UNNAMED
|
|
||||||
@@ -24,14 +24,22 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
|||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
|
||||||
import org.apache.flink.util.ParameterTool;
|
import org.apache.flink.util.ParameterTool;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
import stream.config.StreamingDefaults;
|
||||||
import stream.dto.*;
|
import stream.dto.*;
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
import stream.io.PoolElaborator;
|
import stream.io.PoolTokenIdElaborator;
|
||||||
import stream.io.TokenElaborator;
|
import stream.io.TokenElaborator;
|
||||||
|
import stream.io.SwapElaborator;
|
||||||
|
import stream.io.BlockTimestampElaborator;
|
||||||
|
import stream.ohlc.OHLCPipeline;
|
||||||
import stream.source.eventlog.EventLogSourceFactory;
|
import stream.source.eventlog.EventLogSourceFactory;
|
||||||
import stream.source.newheads.NewHeadsSourceFactory;
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
@@ -65,41 +73,42 @@ public class DataStreamJob {
|
|||||||
|
|
||||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
env.getConfig().setGlobalJobParameters(parameters);
|
env.getConfig().setGlobalJobParameters(parameters);
|
||||||
// do not do this until considering how secrets are handled by flink
|
|
||||||
// env.getConfig().setGlobalJobParameters(parameters);
|
|
||||||
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
|
|
||||||
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
|
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
|
||||||
|
|
||||||
|
// Async operation parameters
|
||||||
|
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
|
||||||
|
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
|
||||||
|
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
|
||||||
|
|
||||||
// Create ObjectMapper for pretty JSON printing
|
// Create ObjectMapper for pretty JSON printing
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||||
|
|
||||||
|
|
||||||
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
|
||||||
.fromSource(
|
|
||||||
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
|
|
||||||
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
|
|
||||||
"ArbitrumOne Head Blocks",
|
|
||||||
TypeInformation.of(ArbitrumOneBlock.class)
|
|
||||||
);
|
|
||||||
|
|
||||||
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
|
|
||||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
|
||||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||||
|
|
||||||
|
// Add block timestamp to swap events
|
||||||
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
|
||||||
swapStream,
|
swapStream,
|
||||||
new PoolElaborator(),
|
new BlockTimestampElaborator(),
|
||||||
30,
|
asyncTimeoutSeconds,
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS,
|
||||||
|
asyncCapacity
|
||||||
|
);
|
||||||
|
|
||||||
|
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||||
|
swapWithTimestampStream,
|
||||||
|
new PoolTokenIdElaborator(),
|
||||||
|
asyncTimeoutSeconds,
|
||||||
|
TimeUnit.SECONDS,
|
||||||
|
asyncCapacity
|
||||||
);
|
);
|
||||||
|
|
||||||
// Extract token addresses and elaborate with metadata
|
// Extract token addresses and elaborate with metadata
|
||||||
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
|
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
|
||||||
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() {
|
.flatMap(new FlatMapFunction<SwapEventWithTokenIds, AddressId>() {
|
||||||
@Override
|
@Override
|
||||||
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception {
|
public void flatMap(SwapEventWithTokenIds event, Collector<AddressId> collector) throws Exception {
|
||||||
collector.collect(new AddressId(42161, event.getToken0Address()));
|
collector.collect(new AddressId(42161, event.getToken0Address()));
|
||||||
collector.collect(new AddressId(42161, event.getToken1Address()));
|
collector.collect(new AddressId(42161, event.getToken1Address()));
|
||||||
}
|
}
|
||||||
@@ -109,39 +118,104 @@ public class DataStreamJob {
|
|||||||
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
|
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
|
||||||
tokenAddresses,
|
tokenAddresses,
|
||||||
new TokenElaborator(),
|
new TokenElaborator(),
|
||||||
120,
|
asyncTimeoutSeconds,
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS,
|
||||||
|
asyncCapacity
|
||||||
);
|
);
|
||||||
|
|
||||||
// Print comprehensive swap event data with token metadata
|
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
|
||||||
elaboratedSwapStream
|
DataStream<SwapEventWithTokenMetadata> swapsWithTokens = elaboratedSwapStream
|
||||||
.map(event -> {
|
.connect(elaboratedTokens)
|
||||||
|
.flatMap(new RichCoFlatMapFunction<SwapEventWithTokenIds, Token, SwapEventWithTokenMetadata>() {
|
||||||
|
private final Map<String, Token> tokenCache = new HashMap<>();
|
||||||
|
private final Map<String, SwapEventWithTokenIds> pendingSwaps = new HashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flatMap1(SwapEventWithTokenIds event, Collector<SwapEventWithTokenMetadata> out) throws Exception {
|
||||||
|
String token0Addr = event.getToken0Address().toLowerCase();
|
||||||
|
String token1Addr = event.getToken1Address().toLowerCase();
|
||||||
|
|
||||||
|
Token token0 = tokenCache.get(token0Addr);
|
||||||
|
Token token1 = tokenCache.get(token1Addr);
|
||||||
|
|
||||||
|
if (token0 != null && token1 != null) {
|
||||||
|
// We have both tokens, create SwapEventWithTokenMetadata
|
||||||
|
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
|
||||||
|
out.collect(swapWithMetadata);
|
||||||
|
} else {
|
||||||
|
// Cache the swap event until we get both tokens
|
||||||
|
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flatMap2(Token token, Collector<SwapEventWithTokenMetadata> out) throws Exception {
|
||||||
|
// Cache the token
|
||||||
|
tokenCache.put(token.address.toLowerCase(), token);
|
||||||
|
|
||||||
|
// Check if any pending swaps can now be completed
|
||||||
|
Iterator<Map.Entry<String, SwapEventWithTokenIds>> iterator = pendingSwaps.entrySet().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<String, SwapEventWithTokenIds> entry = iterator.next();
|
||||||
|
SwapEventWithTokenIds event = entry.getValue();
|
||||||
|
|
||||||
|
String token0Addr = event.getToken0Address().toLowerCase();
|
||||||
|
String token1Addr = event.getToken1Address().toLowerCase();
|
||||||
|
|
||||||
|
Token token0 = tokenCache.get(token0Addr);
|
||||||
|
Token token1 = tokenCache.get(token1Addr);
|
||||||
|
|
||||||
|
if (token0 != null && token1 != null) {
|
||||||
|
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
|
||||||
|
out.collect(swapWithMetadata);
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Apply SwapElaborator to create final Swap objects with calculated prices
|
||||||
|
DataStream<Swap> swaps = swapsWithTokens
|
||||||
|
.map(new SwapElaborator());
|
||||||
|
|
||||||
|
// Test OHLC with USDC/WETH pool
|
||||||
|
OHLCPipeline ohlcPipeline = new OHLCPipeline();
|
||||||
|
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
|
||||||
|
|
||||||
|
// Filter and print OHLC candles for USDC/WETH pool only
|
||||||
|
allOhlc
|
||||||
|
.filter(candle -> {
|
||||||
|
// Filter for specific USDC/WETH pool
|
||||||
|
String poolAddress = candle.getPool().toLowerCase();
|
||||||
|
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
|
||||||
|
return poolAddress.equals(targetPool);
|
||||||
|
})
|
||||||
|
.map(candle -> {
|
||||||
|
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
|
||||||
|
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
|
||||||
|
" Trades=" + candle.getTradeCount() +
|
||||||
|
" Open=" + candle.getOpen() +
|
||||||
|
" High=" + candle.getHigh() +
|
||||||
|
" Low=" + candle.getLow() +
|
||||||
|
" Close=" + candle.getClose() +
|
||||||
|
" Volume=" + candle.getVolume());
|
||||||
|
return candle;
|
||||||
|
})
|
||||||
|
.setParallelism(1)
|
||||||
|
.returns(OHLCCandle.class)
|
||||||
|
.print("USDC-WETH-OHLC");
|
||||||
|
|
||||||
|
// Print the final enriched swap objects
|
||||||
|
swaps
|
||||||
|
.map(swap -> {
|
||||||
try {
|
try {
|
||||||
String json = mapper.writeValueAsString(event);
|
String json = mapper.writeValueAsString(swap);
|
||||||
log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}",
|
|
||||||
event.getSwapEvent().address,
|
|
||||||
event.getSwapEvent().blockNumber,
|
|
||||||
event.getSwapEvent().transactionHash,
|
|
||||||
event.getToken0Address(),
|
|
||||||
event.getToken1Address(),
|
|
||||||
event.getSwapEvent().amount0,
|
|
||||||
event.getSwapEvent().amount1,
|
|
||||||
event.getSwapEvent().tick);
|
|
||||||
return json;
|
return json;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return "Error converting elaborated swap event to JSON: " + e.getMessage();
|
return "Error converting enriched swap to JSON: " + e.getMessage();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.print("Swap Event: ");
|
.print("Enriched Swap: ");
|
||||||
|
|
||||||
// Print token metadata when available
|
|
||||||
elaboratedTokens
|
|
||||||
.map(token -> {
|
|
||||||
log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}",
|
|
||||||
token.address, token.name, token.symbol, token.decimals);
|
|
||||||
return token.toString();
|
|
||||||
})
|
|
||||||
.print("Token: ");
|
|
||||||
|
|
||||||
env.execute("Ethereum Block Stream");
|
env.execute("Ethereum Block Stream");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import org.web3j.abi.datatypes.Bool;
|
|||||||
import org.web3j.abi.datatypes.Type;
|
import org.web3j.abi.datatypes.Type;
|
||||||
import org.web3j.abi.datatypes.Utf8String;
|
import org.web3j.abi.datatypes.Utf8String;
|
||||||
import org.web3j.abi.datatypes.generated.*;
|
import org.web3j.abi.datatypes.generated.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.Serial;
|
import java.io.Serial;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
@@ -20,6 +22,8 @@ import static stream.io.EthUtils.keccak;
|
|||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public class ElaboratedEventType<T extends EventLog> extends EventType {
|
public class ElaboratedEventType<T extends EventLog> extends EventType {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class);
|
||||||
|
|
||||||
public transient String name;
|
public transient String name;
|
||||||
public transient List<String> paramNames;
|
public transient List<String> paramNames;
|
||||||
@@ -63,7 +67,9 @@ public class ElaboratedEventType<T extends EventLog> extends EventType {
|
|||||||
int dataValuesIndex = 0;
|
int dataValuesIndex = 0;
|
||||||
int topicsIndex = 1; // the first topic is the event signature
|
int topicsIndex = 1; // the first topic is the event signature
|
||||||
List<Object> args = new ArrayList<>();
|
List<Object> args = new ArrayList<>();
|
||||||
for (TypeReference<Type> paramType : this.paramTypes) {
|
for (int i = 0; i < this.paramTypes.size(); i++) {
|
||||||
|
TypeReference<Type> paramType = this.paramTypes.get(i);
|
||||||
|
String paramName = this.paramNames.get(i);
|
||||||
Object value;
|
Object value;
|
||||||
if (paramType.isIndexed()) {
|
if (paramType.isIndexed()) {
|
||||||
String encoded = topics.get(topicsIndex++);
|
String encoded = topics.get(topicsIndex++);
|
||||||
|
|||||||
56
src/main/java/stream/dto/OHLCCandle.java
Normal file
56
src/main/java/stream/dto/OHLCCandle.java
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
public class OHLCCandle {
|
||||||
|
private final String pool;
|
||||||
|
private final String token0;
|
||||||
|
private final String token1;
|
||||||
|
private final long windowStart;
|
||||||
|
private final long windowEnd;
|
||||||
|
private final BigDecimal open;
|
||||||
|
private final BigDecimal high;
|
||||||
|
private final BigDecimal low;
|
||||||
|
private final BigDecimal close;
|
||||||
|
private final BigDecimal volume;
|
||||||
|
private final int tradeCount;
|
||||||
|
|
||||||
|
public OHLCCandle(String pool, String token0, String token1,
|
||||||
|
long windowStart, long windowEnd,
|
||||||
|
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
|
||||||
|
BigDecimal volume, int tradeCount) {
|
||||||
|
this.pool = pool;
|
||||||
|
this.token0 = token0;
|
||||||
|
this.token1 = token1;
|
||||||
|
this.windowStart = windowStart;
|
||||||
|
this.windowEnd = windowEnd;
|
||||||
|
this.open = open;
|
||||||
|
this.high = high;
|
||||||
|
this.low = low;
|
||||||
|
this.close = close;
|
||||||
|
this.volume = volume;
|
||||||
|
this.tradeCount = tradeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Getters
|
||||||
|
public String getPool() { return pool; }
|
||||||
|
public String getToken0() { return token0; }
|
||||||
|
public String getToken1() { return token1; }
|
||||||
|
public long getWindowStart() { return windowStart; }
|
||||||
|
public long getWindowEnd() { return windowEnd; }
|
||||||
|
public BigDecimal getOpen() { return open; }
|
||||||
|
public BigDecimal getHigh() { return high; }
|
||||||
|
public BigDecimal getLow() { return low; }
|
||||||
|
public BigDecimal getClose() { return close; }
|
||||||
|
public BigDecimal getVolume() { return volume; }
|
||||||
|
public int getTradeCount() { return tradeCount; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
|
||||||
|
pool.substring(0, 8) + "...",
|
||||||
|
token0.substring(0, 6), token1.substring(0, 6),
|
||||||
|
windowStart, windowEnd,
|
||||||
|
open, high, low, close, volume, tradeCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,23 +8,35 @@ public class Swap extends ChainId {
|
|||||||
@Serial
|
@Serial
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
Long time; // milliseconds
|
public Long time; // milliseconds
|
||||||
Exchange exchange;
|
public Exchange exchange;
|
||||||
String pool; // address
|
public String pool; // address
|
||||||
String takerAsset; // token address
|
public String takerAsset; // token address
|
||||||
String makerAsset; // token address
|
public String makerAsset; // token address
|
||||||
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
|
public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold.
|
||||||
BigDecimal price;
|
public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold.
|
||||||
|
public BigDecimal price;
|
||||||
|
|
||||||
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
|
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
|
||||||
BigDecimal amount, BigDecimal price) {
|
BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) {
|
||||||
super(chainId);
|
super(chainId);
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.exchange = exchange;
|
this.exchange = exchange;
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
this.takerAsset = takerAsset;
|
this.takerAsset = takerAsset;
|
||||||
this.makerAsset = makerAsset;
|
this.makerAsset = makerAsset;
|
||||||
this.amount = amount;
|
this.amountIn = amountIn;
|
||||||
this.price = price;
|
this.amountOut = amountOut;
|
||||||
|
this.price = price; // Total trade value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Getter methods
|
||||||
|
public Long getTime() { return time; }
|
||||||
|
public Exchange getExchange() { return exchange; }
|
||||||
|
public String getPool() { return pool; }
|
||||||
|
public String getTakerAsset() { return takerAsset; }
|
||||||
|
public String getMakerAsset() { return makerAsset; }
|
||||||
|
public BigDecimal getAmountIn() { return amountIn; }
|
||||||
|
public BigDecimal getAmountOut() { return amountOut; }
|
||||||
|
public BigDecimal getPrice() { return price; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,13 +29,9 @@ public class SwapEventLog extends EventLog implements Serializable {
|
|||||||
|
|
||||||
public String sender;
|
public String sender;
|
||||||
public String recipient;
|
public String recipient;
|
||||||
@BigInt
|
|
||||||
public BigInteger amount0;
|
public BigInteger amount0;
|
||||||
@BigInt
|
|
||||||
public BigInteger amount1;
|
public BigInteger amount1;
|
||||||
@BigInt
|
|
||||||
public BigInteger sqrtPriceX96;
|
public BigInteger sqrtPriceX96;
|
||||||
@BigInt
|
|
||||||
public BigInteger liquidity;
|
public BigInteger liquidity;
|
||||||
public int tick;
|
public int tick;
|
||||||
|
|
||||||
|
|||||||
@@ -1,21 +1,30 @@
|
|||||||
package stream.dto;
|
package stream.dto;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.math.BigInteger;
|
||||||
|
|
||||||
public class ElaboratedSwapEvent implements Serializable {
|
public class SwapEventWithTokenIds implements Serializable {
|
||||||
private SwapEventLog swapEvent;
|
private SwapEventLog swapEvent;
|
||||||
private String token0Address;
|
private String token0Address;
|
||||||
private String token1Address;
|
private String token1Address;
|
||||||
|
private BigInteger timestamp;
|
||||||
|
|
||||||
public ElaboratedSwapEvent() {
|
public SwapEventWithTokenIds() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
||||||
this.swapEvent = swapEvent;
|
this.swapEvent = swapEvent;
|
||||||
this.token0Address = token0Address;
|
this.token0Address = token0Address;
|
||||||
this.token1Address = token1Address;
|
this.token1Address = token1Address;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.token0Address = token0Address;
|
||||||
|
this.token1Address = token1Address;
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
public SwapEventLog getSwapEvent() {
|
public SwapEventLog getSwapEvent() {
|
||||||
return swapEvent;
|
return swapEvent;
|
||||||
}
|
}
|
||||||
@@ -39,4 +48,12 @@ public class ElaboratedSwapEvent implements Serializable {
|
|||||||
public void setToken1Address(String token1Address) {
|
public void setToken1Address(String token1Address) {
|
||||||
this.token1Address = token1Address;
|
this.token1Address = token1Address;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public BigInteger getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimestamp(BigInteger timestamp) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -2,25 +2,25 @@ package stream.dto;
|
|||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class FullyElaboratedSwap implements Serializable {
|
public class SwapEventWithTokenMetadata implements Serializable {
|
||||||
private ElaboratedSwapEvent swapEvent;
|
private SwapEventWithTokenIds swapEvent;
|
||||||
private Token token0;
|
private Token token0;
|
||||||
private Token token1;
|
private Token token1;
|
||||||
|
|
||||||
public FullyElaboratedSwap() {
|
public SwapEventWithTokenMetadata() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) {
|
public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) {
|
||||||
this.swapEvent = swapEvent;
|
this.swapEvent = swapEvent;
|
||||||
this.token0 = token0;
|
this.token0 = token0;
|
||||||
this.token1 = token1;
|
this.token1 = token1;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ElaboratedSwapEvent getSwapEvent() {
|
public SwapEventWithTokenIds getSwapEvent() {
|
||||||
return swapEvent;
|
return swapEvent;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSwapEvent(ElaboratedSwapEvent swapEvent) {
|
public void setSwapEvent(SwapEventWithTokenIds swapEvent) {
|
||||||
this.swapEvent = swapEvent;
|
this.swapEvent = swapEvent;
|
||||||
}
|
}
|
||||||
|
|
||||||
24
src/main/java/stream/dto/SwapWithTimestamp.java
Normal file
24
src/main/java/stream/dto/SwapWithTimestamp.java
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.math.BigInteger;
|
||||||
|
|
||||||
|
public class SwapWithTimestamp implements Serializable {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
private final SwapEventLog swapEvent;
|
||||||
|
private final BigInteger timestamp;
|
||||||
|
|
||||||
|
public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventLog getSwapEvent() {
|
||||||
|
return swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BigInteger getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,11 +16,14 @@ import java.util.Collections;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
|
||||||
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||||
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
|
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
|
||||||
private transient Web3j w3;
|
private transient Web3j w3;
|
||||||
|
private static final AtomicInteger activeOperations = new AtomicInteger(0);
|
||||||
|
private static final AtomicInteger totalOperations = new AtomicInteger(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(OpenContext openContext) throws Exception {
|
public void open(OpenContext openContext) throws Exception {
|
||||||
@@ -30,6 +33,13 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
|
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
|
||||||
|
int active = activeOperations.incrementAndGet();
|
||||||
|
int total = totalOperations.incrementAndGet();
|
||||||
|
|
||||||
|
if (total % 100 == 0) {
|
||||||
|
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
|
||||||
|
}
|
||||||
|
|
||||||
CompletableFuture.supplyAsync(() -> {
|
CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
return getBlock(blockId);
|
return getBlock(blockId);
|
||||||
@@ -37,7 +47,14 @@ public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
|||||||
log.error("Failed to get block {} on chain {}", blockId, e);
|
log.error("Failed to get block {} on chain {}", blockId, e);
|
||||||
throw new RuntimeException("Error processing block " + blockId, e);
|
throw new RuntimeException("Error processing block " + blockId, e);
|
||||||
}
|
}
|
||||||
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
|
}).thenAccept(result -> {
|
||||||
|
activeOperations.decrementAndGet();
|
||||||
|
resultFuture.complete(Collections.singleton(result));
|
||||||
|
}).exceptionally(throwable -> {
|
||||||
|
activeOperations.decrementAndGet();
|
||||||
|
resultFuture.completeExceptionally(throwable);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private EthBlock getBlock(BlockId id) {
|
private EthBlock getBlock(BlockId id) {
|
||||||
|
|||||||
80
src/main/java/stream/io/BlockTimestampElaborator.java
Normal file
80
src/main/java/stream/io/BlockTimestampElaborator.java
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.web3j.protocol.core.methods.response.EthBlock;
|
||||||
|
import stream.dto.BlockHash;
|
||||||
|
import stream.dto.SwapEventLog;
|
||||||
|
import stream.dto.SwapWithTimestamp;
|
||||||
|
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class BlockTimestampElaborator extends RichAsyncFunction<SwapEventLog, SwapWithTimestamp> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class);
|
||||||
|
private transient BlockElaborator blockElaborator;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
blockElaborator = new BlockElaborator();
|
||||||
|
blockElaborator.setRuntimeContext(getRuntimeContext());
|
||||||
|
blockElaborator.open(openContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<SwapWithTimestamp> resultFuture) {
|
||||||
|
try {
|
||||||
|
BlockHash blockHash = new BlockHash();
|
||||||
|
blockHash.hash = swapEvent.blockHash;
|
||||||
|
|
||||||
|
// Create a custom ResultFuture to handle the block elaboration result
|
||||||
|
ResultFuture<EthBlock> blockResultFuture = new ResultFuture<EthBlock>() {
|
||||||
|
@Override
|
||||||
|
public void complete(java.util.Collection<EthBlock> result) {
|
||||||
|
try {
|
||||||
|
if (!result.isEmpty()) {
|
||||||
|
EthBlock ethBlock = result.iterator().next();
|
||||||
|
BigInteger timestamp = ethBlock.getBlock().getTimestamp();
|
||||||
|
SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp);
|
||||||
|
resultFuture.complete(Collections.singleton(swapWithTimestamp));
|
||||||
|
} else {
|
||||||
|
log.error("No block found for block hash {}", swapEvent.blockHash);
|
||||||
|
resultFuture.completeExceptionally(new RuntimeException("No block found"));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to get timestamp for block {} of swap in tx {}",
|
||||||
|
swapEvent.blockHash, swapEvent.transactionHash, e);
|
||||||
|
resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<EthBlock> collectionSupplier) {
|
||||||
|
try {
|
||||||
|
complete(collectionSupplier.get());
|
||||||
|
} catch (Exception e) {
|
||||||
|
completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completeExceptionally(Throwable error) {
|
||||||
|
log.error("Failed to get block {} for swap in tx {}",
|
||||||
|
swapEvent.blockHash, swapEvent.transactionHash, error);
|
||||||
|
resultFuture.completeExceptionally(error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Invoke the block elaborator asynchronously
|
||||||
|
blockElaborator.asyncInvoke(blockHash, blockResultFuture);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error in BlockTimestampElaborator for swap in tx {}",
|
||||||
|
swapEvent.transactionHash, e);
|
||||||
|
resultFuture.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
package stream.io;
|
package stream.io;
|
||||||
|
|
||||||
import stream.dto.SwapEventLog;
|
import stream.dto.SwapWithTimestamp;
|
||||||
import stream.dto.ElaboratedSwapEvent;
|
import stream.dto.SwapEventWithTokenIds;
|
||||||
import stream.contract.UniswapV3Pool;
|
import stream.contract.UniswapV3Pool;
|
||||||
|
|
||||||
import org.apache.flink.api.common.functions.OpenContext;
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
@@ -17,8 +17,8 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> {
|
public class PoolTokenIdElaborator extends RichAsyncFunction<SwapWithTimestamp, SwapEventWithTokenIds> {
|
||||||
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class);
|
private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class);
|
||||||
private transient Web3j web3j;
|
private transient Web3j web3j;
|
||||||
private transient Credentials credentials;
|
private transient Credentials credentials;
|
||||||
private transient DefaultGasProvider gasProvider;
|
private transient DefaultGasProvider gasProvider;
|
||||||
@@ -43,12 +43,12 @@ public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSw
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception {
|
public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture<SwapEventWithTokenIds> resultFuture) throws Exception {
|
||||||
CompletableFuture.supplyAsync(() -> {
|
CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
// Load the pool contract
|
// Load the pool contract
|
||||||
UniswapV3Pool pool = UniswapV3Pool.load(
|
UniswapV3Pool pool = UniswapV3Pool.load(
|
||||||
swap.getAddress(), // Pool address from the event
|
swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event
|
||||||
web3j,
|
web3j,
|
||||||
credentials,
|
credentials,
|
||||||
gasProvider
|
gasProvider
|
||||||
@@ -58,13 +58,14 @@ public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSw
|
|||||||
String token0 = pool.token0().send();
|
String token0 = pool.token0().send();
|
||||||
String token1 = pool.token1().send();
|
String token1 = pool.token1().send();
|
||||||
|
|
||||||
// Create enriched event
|
// Create enriched event with timestamp
|
||||||
return new ElaboratedSwapEvent(swap, token0, token1);
|
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp());
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error fetching pool tokens", e);
|
log.error("Error fetching pool tokens for swap in pool {}",
|
||||||
// Return original without enrichment
|
swapWithTimestamp.getSwapEvent().getAddress(), e);
|
||||||
return new ElaboratedSwapEvent(swap, null, null);
|
// Return original without enrichment but with timestamp
|
||||||
|
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp());
|
||||||
}
|
}
|
||||||
}).thenAccept(enriched -> {
|
}).thenAccept(enriched -> {
|
||||||
resultFuture.complete(Collections.singletonList(enriched));
|
resultFuture.complete(Collections.singletonList(enriched));
|
||||||
@@ -77,4 +78,4 @@ public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSw
|
|||||||
web3j.shutdown();
|
web3j.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -11,20 +11,20 @@ import java.math.BigInteger;
|
|||||||
import java.math.MathContext;
|
import java.math.MathContext;
|
||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
|
|
||||||
public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
|
public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata, Swap> {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class);
|
private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class);
|
||||||
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
|
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
|
||||||
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
|
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Swap map(FullyElaboratedSwap fullSwap) throws Exception {
|
public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception {
|
||||||
ElaboratedSwapEvent event = fullSwap.getSwapEvent();
|
SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent();
|
||||||
SwapEventLog swapLog = event.getSwapEvent();
|
SwapEventLog swapLog = event.getSwapEvent();
|
||||||
Token token0 = fullSwap.getToken0();
|
Token token0 = swapWithMetadata.getToken0();
|
||||||
Token token1 = fullSwap.getToken1();
|
Token token1 = swapWithMetadata.getToken1();
|
||||||
|
|
||||||
// For now, hardcode exchange as UNISWAP_V3 and time as current time
|
// Use timestamp from block elaboration and set exchange as UNISWAP_V3
|
||||||
Long time = System.currentTimeMillis();
|
Long time = event.getTimestamp().longValue();
|
||||||
Exchange exchange = Exchange.UNISWAP_V3;
|
Exchange exchange = Exchange.UNISWAP_V3;
|
||||||
|
|
||||||
// Pool address
|
// Pool address
|
||||||
@@ -45,10 +45,9 @@ public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
|
|||||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
||||||
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine which token is in and which is out based on amount signs
|
// Determine which token is in and which is out based on amount signs
|
||||||
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||||
|
|
||||||
String takerAsset;
|
String takerAsset;
|
||||||
String makerAsset;
|
String makerAsset;
|
||||||
BigDecimal amountIn;
|
BigDecimal amountIn;
|
||||||
@@ -59,7 +58,6 @@ public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
|
|||||||
// User is sending token0, receiving token1
|
// User is sending token0, receiving token1
|
||||||
takerAsset = event.getToken0Address();
|
takerAsset = event.getToken0Address();
|
||||||
makerAsset = event.getToken1Address();
|
makerAsset = event.getToken1Address();
|
||||||
|
|
||||||
// Convert amounts to human-readable format using decimals
|
// Convert amounts to human-readable format using decimals
|
||||||
amountIn = new BigDecimal(swapLog.amount0.abs())
|
amountIn = new BigDecimal(swapLog.amount0.abs())
|
||||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||||
@@ -79,15 +77,12 @@ public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
|
|||||||
amountOut = new BigDecimal(swapLog.amount0)
|
amountOut = new BigDecimal(swapLog.amount0)
|
||||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||||
|
|
||||||
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
|
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
|
||||||
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
|
// Currently adjustedPrice = token1/token0, so we keep it as is
|
||||||
|
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
|
||||||
|
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
|
||||||
|
finalPrice = adjustedPrice;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
|
|
||||||
pool, token0.symbol + "/" + token1.symbol,
|
|
||||||
isToken0In ? token1.symbol : token0.symbol,
|
|
||||||
amountIn, amountOut, finalPrice);
|
|
||||||
|
|
||||||
// Pass both amountIn and amountOut to constructor with unit price
|
// Pass both amountIn and amountOut to constructor with unit price
|
||||||
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
||||||
}
|
}
|
||||||
@@ -6,6 +6,7 @@ import org.web3j.protocol.Web3j;
|
|||||||
import org.web3j.protocol.http.HttpService;
|
import org.web3j.protocol.http.HttpService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import stream.config.StreamingDefaults;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -25,15 +26,19 @@ public class Web3Client {
|
|||||||
synchronized (w3Lock) {
|
synchronized (w3Lock) {
|
||||||
log.info("Initializing Web3 client");
|
log.info("Initializing Web3 client");
|
||||||
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
|
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||||
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
|
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
|
||||||
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
|
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
|
||||||
Duration timeout = Duration.ofSeconds(30);
|
Duration timeout = Duration.ofSeconds(30);
|
||||||
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
|
||||||
|
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
|
||||||
|
|
||||||
|
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||||
url, maxIdleConnections, keepAlive, timeout.getSeconds());
|
url, maxIdleConnections, keepAlive, timeout.getSeconds());
|
||||||
|
|
||||||
var httpClient = new OkHttpClient.Builder()
|
var httpClient = new OkHttpClient.Builder()
|
||||||
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
|
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
|
||||||
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
|
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
|
||||||
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
|
.connectionPool(connectionPool)
|
||||||
.build();
|
.build();
|
||||||
w3 = Web3j.build(new HttpService(url, httpClient));
|
w3 = Web3j.build(new HttpService(url, httpClient));
|
||||||
log.info("Web3 client initialized successfully");
|
log.info("Web3 client initialized successfully");
|
||||||
|
|||||||
41
src/main/java/stream/ohlc/OHLCAccumulator.java
Normal file
41
src/main/java/stream/ohlc/OHLCAccumulator.java
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package stream.ohlc;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
public class OHLCAccumulator {
|
||||||
|
public String pool = null;
|
||||||
|
public String token0 = null; // Added
|
||||||
|
public String token1 = null; // Added
|
||||||
|
public BigDecimal open = null;
|
||||||
|
public BigDecimal high = null;
|
||||||
|
public BigDecimal low = null;
|
||||||
|
public BigDecimal close = null;
|
||||||
|
public BigDecimal volume = BigDecimal.ZERO;
|
||||||
|
public int tradeCount = 0;
|
||||||
|
public long firstTradeTime = 0;
|
||||||
|
public long lastTradeTime = 0;
|
||||||
|
|
||||||
|
public OHLCAccumulator() {}
|
||||||
|
|
||||||
|
public void updatePrice(BigDecimal price, long timestamp) {
|
||||||
|
if (open == null) {
|
||||||
|
open = price;
|
||||||
|
high = price;
|
||||||
|
low = price;
|
||||||
|
firstTradeTime = timestamp;
|
||||||
|
} else {
|
||||||
|
high = high.max(price);
|
||||||
|
low = low.min(price);
|
||||||
|
}
|
||||||
|
close = price;
|
||||||
|
lastTradeTime = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addVolume(BigDecimal amount) {
|
||||||
|
volume = volume.add(amount);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementTradeCount() {
|
||||||
|
tradeCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
98
src/main/java/stream/ohlc/OHLCAggregator.java
Normal file
98
src/main/java/stream/ohlc/OHLCAggregator.java
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package stream.ohlc;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||||
|
import stream.dto.Swap;
|
||||||
|
import stream.dto.OHLCCandle;
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
|
||||||
|
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
|
||||||
|
|
||||||
|
private final long windowSize = 60; // 1 minute in seconds
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCAccumulator createAccumulator() {
|
||||||
|
return new OHLCAccumulator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
|
||||||
|
// Initialize pool and tokens on first swap
|
||||||
|
if (accumulator.pool == null) {
|
||||||
|
accumulator.pool = swap.getPool();
|
||||||
|
// Store tokens in consistent order (you could sort by address)
|
||||||
|
accumulator.token0 = swap.getTakerAsset();
|
||||||
|
accumulator.token1 = swap.getMakerAsset();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update OHLC prices
|
||||||
|
accumulator.updatePrice(swap.getPrice(), swap.getTime());
|
||||||
|
|
||||||
|
// Calculate volume (using the taker's input amount as volume)
|
||||||
|
BigDecimal volume = swap.getAmountIn().abs();
|
||||||
|
accumulator.addVolume(volume);
|
||||||
|
|
||||||
|
// Increment trade count
|
||||||
|
accumulator.incrementTradeCount();
|
||||||
|
|
||||||
|
return accumulator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
|
||||||
|
OHLCAccumulator merged = new OHLCAccumulator();
|
||||||
|
|
||||||
|
// Merge pool and token info
|
||||||
|
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
|
||||||
|
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
|
||||||
|
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
|
||||||
|
|
||||||
|
// Merge OHLC data
|
||||||
|
if (acc1.open != null && acc2.open != null) {
|
||||||
|
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
|
||||||
|
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
|
||||||
|
merged.high = acc1.high.max(acc2.high);
|
||||||
|
merged.low = acc1.low.min(acc2.low);
|
||||||
|
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
|
||||||
|
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
|
||||||
|
} else if (acc1.open != null) {
|
||||||
|
merged.open = acc1.open;
|
||||||
|
merged.close = acc1.close;
|
||||||
|
merged.high = acc1.high;
|
||||||
|
merged.low = acc1.low;
|
||||||
|
merged.firstTradeTime = acc1.firstTradeTime;
|
||||||
|
merged.lastTradeTime = acc1.lastTradeTime;
|
||||||
|
} else if (acc2.open != null) {
|
||||||
|
merged.open = acc2.open;
|
||||||
|
merged.close = acc2.close;
|
||||||
|
merged.high = acc2.high;
|
||||||
|
merged.low = acc2.low;
|
||||||
|
merged.firstTradeTime = acc2.firstTradeTime;
|
||||||
|
merged.lastTradeTime = acc2.lastTradeTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
merged.volume = acc1.volume.add(acc2.volume);
|
||||||
|
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
|
||||||
|
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OHLCCandle getResult(OHLCAccumulator accumulator) {
|
||||||
|
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
|
||||||
|
long windowEnd = windowStart + windowSize;
|
||||||
|
|
||||||
|
return new OHLCCandle(
|
||||||
|
accumulator.pool,
|
||||||
|
accumulator.token0,
|
||||||
|
accumulator.token1,
|
||||||
|
windowStart,
|
||||||
|
windowEnd,
|
||||||
|
accumulator.open,
|
||||||
|
accumulator.high,
|
||||||
|
accumulator.low,
|
||||||
|
accumulator.close,
|
||||||
|
accumulator.volume,
|
||||||
|
accumulator.tradeCount
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
26
src/main/java/stream/ohlc/OHLCPipeline.java
Normal file
26
src/main/java/stream/ohlc/OHLCPipeline.java
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package stream.ohlc;
|
||||||
|
|
||||||
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
|
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
||||||
|
import java.time.Duration;
|
||||||
|
import stream.dto.Swap;
|
||||||
|
import stream.dto.OHLCCandle;
|
||||||
|
|
||||||
|
public class OHLCPipeline {
|
||||||
|
|
||||||
|
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
|
||||||
|
|
||||||
|
return swapStream
|
||||||
|
// NO watermarks needed for processing-time windows
|
||||||
|
.keyBy(Swap::getPool)
|
||||||
|
|
||||||
|
// Use 1-minute processing-time windows
|
||||||
|
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
|
||||||
|
|
||||||
|
// Simply use the aggregator
|
||||||
|
.aggregate(new OHLCAggregator())
|
||||||
|
|
||||||
|
// Filter out empty windows
|
||||||
|
.filter(candle -> candle.getTradeCount() > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user