Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5429649a39 |
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.util.ParameterTool;
|
import org.apache.flink.util.ParameterTool;
|
||||||
import stream.dto.*;
|
import stream.dto.*;
|
||||||
|
import stream.io.SwapEventEnricher;
|
||||||
import stream.source.eventlog.EventLogSourceFactory;
|
import stream.source.eventlog.EventLogSourceFactory;
|
||||||
import stream.source.newheads.NewHeadsSourceFactory;
|
import stream.source.newheads.NewHeadsSourceFactory;
|
||||||
|
|
||||||
@@ -74,72 +75,26 @@ public class DataStreamJob {
|
|||||||
TypeInformation.of(ArbitrumOneBlock.class)
|
TypeInformation.of(ArbitrumOneBlock.class)
|
||||||
);
|
);
|
||||||
|
|
||||||
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
|
|
||||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
|
||||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||||
|
|
||||||
// Map the blocks to pretty-printed JSON strings
|
// Create SwapEnricher to add token addresses and metadata
|
||||||
/*
|
SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
|
||||||
blockStream
|
|
||||||
.map(block -> {
|
// Enrich swap events with token addresses and metadata
|
||||||
try {
|
DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
|
||||||
return mapper.writeValueAsString(block);
|
.unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
|
||||||
} catch (Exception e) {
|
.name("Swap Enricher");
|
||||||
return "Error converting block to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("New Ethereum Block: ");
|
|
||||||
|
|
||||||
transferStream
|
// Print fully enriched swap events with all metadata
|
||||||
|
fullyEnrichedSwapStream
|
||||||
.map(event -> {
|
.map(event -> {
|
||||||
try {
|
try {
|
||||||
return mapper.writeValueAsString(event);
|
return mapper.writeValueAsString(event);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return "Error converting transfer event to JSON: " + e.getMessage();
|
return "Error converting enriched swap event to JSON: " + e.getMessage();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.print("Transfer Event: ");
|
.print("Fully Enriched Swap Event: ");
|
||||||
|
|
||||||
swapStream
|
|
||||||
.map(event -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(event);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting swap event to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("Swap Event: ");
|
|
||||||
*/
|
|
||||||
|
|
||||||
mintStream
|
|
||||||
.map(event -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(event);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting mint event to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("Mint Event: ");
|
|
||||||
|
|
||||||
burnStream
|
|
||||||
.map(event -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(event);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting burn event to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("Burn Event: ");
|
|
||||||
|
|
||||||
swapStream
|
|
||||||
.map(event -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(event);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting swap event to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("Swap Event: ");
|
|
||||||
|
|
||||||
env.execute("Ethereum Block Stream");
|
env.execute("Ethereum Block Stream");
|
||||||
}
|
}
|
||||||
|
|||||||
38
src/main/java/stream/dto/EnrichedSwapEventLog.java
Normal file
38
src/main/java/stream/dto/EnrichedSwapEventLog.java
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
|
||||||
|
|
||||||
|
// Token metadata for token0
|
||||||
|
public Token token0Metadata;
|
||||||
|
|
||||||
|
// Token metadata for token1
|
||||||
|
public Token token1Metadata;
|
||||||
|
|
||||||
|
public EnrichedSwapEventLog() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
|
||||||
|
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
|
||||||
|
|
||||||
|
// Copy all base swap event fields
|
||||||
|
enriched.address = swapEvent.address;
|
||||||
|
enriched.sender = swapEvent.sender;
|
||||||
|
enriched.recipient = swapEvent.recipient;
|
||||||
|
enriched.amount0 = swapEvent.amount0;
|
||||||
|
enriched.amount1 = swapEvent.amount1;
|
||||||
|
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
|
||||||
|
enriched.liquidity = swapEvent.liquidity;
|
||||||
|
enriched.tick = swapEvent.tick;
|
||||||
|
enriched.token0 = swapEvent.token0;
|
||||||
|
enriched.token1 = swapEvent.token1;
|
||||||
|
|
||||||
|
// Copy EventLog fields
|
||||||
|
enriched.blockNumber = swapEvent.blockNumber;
|
||||||
|
enriched.transactionHash = swapEvent.transactionHash;
|
||||||
|
|
||||||
|
return enriched;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -38,5 +38,4 @@ public class SwapEventLog extends EventLog implements Serializable {
|
|||||||
@BigInt
|
@BigInt
|
||||||
public BigInteger liquidity;
|
public BigInteger liquidity;
|
||||||
public int tick;
|
public int tick;
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -6,7 +6,31 @@ public class Token extends AddressId {
|
|||||||
@Serial
|
@Serial
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
String name;
|
private String name;
|
||||||
String symbol;
|
private String symbol;
|
||||||
int decimals;
|
private int decimals;
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSymbol() {
|
||||||
|
return symbol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSymbol(String symbol) {
|
||||||
|
this.symbol = symbol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getDecimals() {
|
||||||
|
return decimals;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDecimals(int decimals) {
|
||||||
|
this.decimals = decimals;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
52
src/main/java/stream/io/PoolTokenFetcher.java
Normal file
52
src/main/java/stream/io/PoolTokenFetcher.java
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
public class PoolTokenFetcher {
|
||||||
|
// Uniswap V3 pool method signatures
|
||||||
|
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
|
||||||
|
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
|
||||||
|
|
||||||
|
private final JsonRpcClient jsonRpcClient;
|
||||||
|
|
||||||
|
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
|
||||||
|
this.jsonRpcClient = jsonRpcClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String fetchToken0(String poolAddress) throws Throwable {
|
||||||
|
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
|
||||||
|
Object[] params = new Object[]{request, "latest"};
|
||||||
|
String result = jsonRpcClient.invoke("eth_call", params, String.class);
|
||||||
|
return decodeAddress(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String fetchToken1(String poolAddress) throws Throwable {
|
||||||
|
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
|
||||||
|
Object[] params = new Object[]{request, "latest"};
|
||||||
|
String result = jsonRpcClient.invoke("eth_call", params, String.class);
|
||||||
|
return decodeAddress(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String decodeAddress(String hexResult) {
|
||||||
|
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Address is the last 20 bytes (40 hex chars) of the result
|
||||||
|
// Remove 0x prefix and get last 40 characters, then add 0x back
|
||||||
|
String hex = hexResult.substring(2);
|
||||||
|
if (hex.length() >= 40) {
|
||||||
|
return "0x" + hex.substring(hex.length() - 40);
|
||||||
|
}
|
||||||
|
|
||||||
|
return hexResult; // Return as-is if unexpected format
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class EthCallRequest {
|
||||||
|
public final String to;
|
||||||
|
public final String data;
|
||||||
|
|
||||||
|
public EthCallRequest(String to, String data) {
|
||||||
|
this.to = to;
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
161
src/main/java/stream/io/SwapEventEnricher.java
Normal file
161
src/main/java/stream/io/SwapEventEnricher.java
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import stream.dto.EnrichedSwapEventLog;
|
||||||
|
import stream.dto.SwapEventLog;
|
||||||
|
import stream.dto.Token;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
|
||||||
|
|
||||||
|
private transient JsonRpcClient jsonRpcClient;
|
||||||
|
private transient PoolTokenFetcher poolTokenFetcher;
|
||||||
|
private transient TokenMetadataFetcher tokenMetadataFetcher;
|
||||||
|
private transient TokenCache tokenCache;
|
||||||
|
private final String rpcUrl;
|
||||||
|
|
||||||
|
public SwapEventEnricher(String rpcUrl) {
|
||||||
|
this.rpcUrl = rpcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
super.open(openContext);
|
||||||
|
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
|
||||||
|
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
|
||||||
|
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
|
||||||
|
this.tokenMetadataFetcher.open(openContext);
|
||||||
|
this.tokenCache = new TokenCache();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
|
||||||
|
try {
|
||||||
|
// First, get token addresses if not already populated
|
||||||
|
CompletableFuture<Void> tokenAddressFuture;
|
||||||
|
|
||||||
|
if (swapEvent.token0 == null || swapEvent.token1 == null) {
|
||||||
|
tokenAddressFuture = CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
if (swapEvent.token0 == null) {
|
||||||
|
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
|
||||||
|
}
|
||||||
|
if (swapEvent.token1 == null) {
|
||||||
|
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
tokenAddressFuture = CompletableFuture.completedFuture(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then fetch metadata for both tokens
|
||||||
|
tokenAddressFuture.thenCompose(v -> {
|
||||||
|
if (swapEvent.token0 == null || swapEvent.token1 == null) {
|
||||||
|
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch metadata for both tokens concurrently (with caching)
|
||||||
|
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
|
||||||
|
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
|
||||||
|
|
||||||
|
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
|
||||||
|
.thenApply(ignored -> {
|
||||||
|
try {
|
||||||
|
Token token0Meta = token0MetaFuture.get();
|
||||||
|
Token token1Meta = token1MetaFuture.get();
|
||||||
|
|
||||||
|
// Create enriched event with token metadata
|
||||||
|
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
|
||||||
|
|
||||||
|
// Set token metadata objects
|
||||||
|
if (token0Meta != null) {
|
||||||
|
enriched.token0Metadata = token0Meta;
|
||||||
|
// Also set the address in the Token object
|
||||||
|
token0Meta.address = swapEvent.token0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (token1Meta != null) {
|
||||||
|
enriched.token1Metadata = token1Meta;
|
||||||
|
// Also set the address in the Token object
|
||||||
|
token1Meta.address = swapEvent.token1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return enriched;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.thenAccept(enrichedEvent -> {
|
||||||
|
resultFuture.complete(Collections.singleton(enrichedEvent));
|
||||||
|
})
|
||||||
|
.exceptionally(throwable -> {
|
||||||
|
System.err.println("Error enriching swap event: " + throwable.getMessage());
|
||||||
|
throwable.printStackTrace();
|
||||||
|
resultFuture.completeExceptionally(throwable);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Log cache stats periodically (every 100 events)
|
||||||
|
if (System.currentTimeMillis() % 10000 < 100) {
|
||||||
|
System.out.println("Token cache stats: " + tokenCache.getStats());
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
resultFuture.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MockResultFuture implements ResultFuture<Token> {
|
||||||
|
private Token result;
|
||||||
|
private Throwable error;
|
||||||
|
private boolean complete = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(java.util.Collection<Token> results) {
|
||||||
|
if (!results.isEmpty()) {
|
||||||
|
this.result = results.iterator().next();
|
||||||
|
}
|
||||||
|
this.complete = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
|
||||||
|
try {
|
||||||
|
java.util.Collection<Token> resultCollection = results.get();
|
||||||
|
complete(resultCollection);
|
||||||
|
} catch (Exception e) {
|
||||||
|
completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completeExceptionally(Throwable error) {
|
||||||
|
this.error = error;
|
||||||
|
this.complete = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isComplete() {
|
||||||
|
return complete;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasError() {
|
||||||
|
return error != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Token getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Throwable getError() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
142
src/main/java/stream/io/TokenCache.java
Normal file
142
src/main/java/stream/io/TokenCache.java
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import stream.dto.Token;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread-safe cache for token metadata to avoid repeated RPC calls
|
||||||
|
* for the same token addresses.
|
||||||
|
*/
|
||||||
|
public class TokenCache {
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
|
||||||
|
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get token from cache if present, otherwise return null
|
||||||
|
*/
|
||||||
|
public Token get(String tokenAddress) {
|
||||||
|
return cache.get(tokenAddress.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Put token in cache
|
||||||
|
*/
|
||||||
|
public void put(String tokenAddress, Token token) {
|
||||||
|
if (token != null && tokenAddress != null) {
|
||||||
|
cache.put(tokenAddress.toLowerCase(), token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if token is already in cache
|
||||||
|
*/
|
||||||
|
public boolean contains(String tokenAddress) {
|
||||||
|
return cache.containsKey(tokenAddress.toLowerCase());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get or fetch token metadata with deduplication.
|
||||||
|
* If multiple requests come for the same token simultaneously,
|
||||||
|
* only one will fetch and others will wait for the result.
|
||||||
|
*/
|
||||||
|
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
|
||||||
|
String normalizedAddress = tokenAddress.toLowerCase();
|
||||||
|
|
||||||
|
// Check cache first
|
||||||
|
Token cachedToken = cache.get(normalizedAddress);
|
||||||
|
if (cachedToken != null) {
|
||||||
|
return CompletableFuture.completedFuture(cachedToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there's already a pending request for this token
|
||||||
|
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
|
||||||
|
if (pendingFuture != null) {
|
||||||
|
return pendingFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new request
|
||||||
|
CompletableFuture<Token> future = new CompletableFuture<>();
|
||||||
|
|
||||||
|
// Register the pending request
|
||||||
|
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
|
||||||
|
if (existingFuture != null) {
|
||||||
|
// Another thread beat us to it, return their future
|
||||||
|
return existingFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We're the first, so fetch the token metadata
|
||||||
|
try {
|
||||||
|
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
|
||||||
|
@Override
|
||||||
|
public void complete(java.util.Collection<Token> results) {
|
||||||
|
Token token = results.isEmpty() ? null : results.iterator().next();
|
||||||
|
|
||||||
|
// Cache the result (even if null to avoid repeated failures)
|
||||||
|
if (token != null) {
|
||||||
|
cache.put(normalizedAddress, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Complete the future
|
||||||
|
future.complete(token);
|
||||||
|
|
||||||
|
// Remove from pending requests
|
||||||
|
pendingRequests.remove(normalizedAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
|
||||||
|
try {
|
||||||
|
java.util.Collection<Token> resultCollection = results.get();
|
||||||
|
complete(resultCollection);
|
||||||
|
} catch (Exception e) {
|
||||||
|
completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completeExceptionally(Throwable error) {
|
||||||
|
future.completeExceptionally(error);
|
||||||
|
pendingRequests.remove(normalizedAddress);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (Exception e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
pendingRequests.remove(normalizedAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get cache statistics for monitoring
|
||||||
|
*/
|
||||||
|
public CacheStats getStats() {
|
||||||
|
return new CacheStats(cache.size(), pendingRequests.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear all cached entries (useful for testing or memory management)
|
||||||
|
*/
|
||||||
|
public void clear() {
|
||||||
|
cache.clear();
|
||||||
|
// Note: We don't clear pending requests as they're in flight
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class CacheStats {
|
||||||
|
public final int cachedTokens;
|
||||||
|
public final int pendingRequests;
|
||||||
|
|
||||||
|
public CacheStats(int cachedTokens, int pendingRequests) {
|
||||||
|
this.cachedTokens = cachedTokens;
|
||||||
|
this.pendingRequests = pendingRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
166
src/main/java/stream/io/TokenMetadataFetcher.java
Normal file
166
src/main/java/stream/io/TokenMetadataFetcher.java
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import stream.dto.Token;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
|
||||||
|
|
||||||
|
// ERC-20 method signatures
|
||||||
|
private static final String NAME_SIGNATURE = "0x06fdde03";
|
||||||
|
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
|
||||||
|
private static final String DECIMALS_SIGNATURE = "0x313ce567";
|
||||||
|
private static final String LATEST_BLOCK = "latest";
|
||||||
|
|
||||||
|
private transient JsonRpcClient jsonRpcClient;
|
||||||
|
private final String rpcUrl;
|
||||||
|
|
||||||
|
public TokenMetadataFetcher(String rpcUrl) {
|
||||||
|
this.rpcUrl = rpcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
super.open(openContext);
|
||||||
|
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
|
||||||
|
try {
|
||||||
|
// Fetch all metadata concurrently
|
||||||
|
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return fetchName(tokenAddress);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return fetchSymbol(tokenAddress);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return fetchDecimals(tokenAddress);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Combine all results
|
||||||
|
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
|
||||||
|
.thenAccept(v -> {
|
||||||
|
try {
|
||||||
|
Token token = new Token();
|
||||||
|
token.setName(decodeString(nameFuture.get()));
|
||||||
|
token.setSymbol(decodeString(symbolFuture.get()));
|
||||||
|
token.setDecimals(decodeUint8(decimalsFuture.get()));
|
||||||
|
|
||||||
|
resultFuture.complete(Collections.singleton(token));
|
||||||
|
} catch (Exception e) {
|
||||||
|
resultFuture.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.exceptionally(throwable -> {
|
||||||
|
resultFuture.completeExceptionally(throwable);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
resultFuture.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String fetchName(String tokenAddress) throws Throwable {
|
||||||
|
return makeEthCall(tokenAddress, NAME_SIGNATURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String fetchSymbol(String tokenAddress) throws Throwable {
|
||||||
|
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String fetchDecimals(String tokenAddress) throws Throwable {
|
||||||
|
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String makeEthCall(String toAddress, String data) throws Throwable {
|
||||||
|
EthCallRequest request = new EthCallRequest(toAddress, data);
|
||||||
|
Object[] params = new Object[]{request, LATEST_BLOCK};
|
||||||
|
return jsonRpcClient.invoke("eth_call", params, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String decodeString(String hexData) {
|
||||||
|
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
|
||||||
|
return "Unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Remove 0x prefix
|
||||||
|
String hex = hexData.substring(2);
|
||||||
|
|
||||||
|
// Skip the first 32 bytes (offset pointer)
|
||||||
|
// The actual string length is at bytes 32-64
|
||||||
|
String lengthHex = hex.substring(64, 128);
|
||||||
|
int length = Integer.parseInt(lengthHex, 16);
|
||||||
|
|
||||||
|
if (length <= 0 || length > 100) { // Sanity check
|
||||||
|
return "Invalid";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the actual string data starting at byte 64
|
||||||
|
String dataHex = hex.substring(128, 128 + (length * 2));
|
||||||
|
|
||||||
|
// Convert hex to string
|
||||||
|
StringBuilder result = new StringBuilder();
|
||||||
|
for (int i = 0; i < dataHex.length(); i += 2) {
|
||||||
|
String hexChar = dataHex.substring(i, i + 2);
|
||||||
|
int charCode = Integer.parseInt(hexChar, 16);
|
||||||
|
if (charCode > 0) { // Skip null bytes
|
||||||
|
result.append((char) charCode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.toString();
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
|
||||||
|
return "DecodeError";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int decodeUint8(String hexData) {
|
||||||
|
if (hexData == null || hexData.equals("0x")) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Remove 0x prefix and get last byte (uint8)
|
||||||
|
String hex = hexData.substring(2);
|
||||||
|
// uint8 is the last 2 hex characters (1 byte)
|
||||||
|
String uint8Hex = hex.substring(hex.length() - 2);
|
||||||
|
return Integer.parseInt(uint8Hex, 16);
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class EthCallRequest {
|
||||||
|
public final String to;
|
||||||
|
public final String data;
|
||||||
|
|
||||||
|
public EthCallRequest(String to, String data) {
|
||||||
|
this.to = to;
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user