Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5429649a39 |
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.util.ParameterTool;
|
||||
import stream.dto.*;
|
||||
import stream.io.SwapEventEnricher;
|
||||
import stream.source.eventlog.EventLogSourceFactory;
|
||||
import stream.source.newheads.NewHeadsSourceFactory;
|
||||
|
||||
@@ -74,72 +75,26 @@ public class DataStreamJob {
|
||||
TypeInformation.of(ArbitrumOneBlock.class)
|
||||
);
|
||||
|
||||
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
|
||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||
|
||||
// Map the blocks to pretty-printed JSON strings
|
||||
/*
|
||||
blockStream
|
||||
.map(block -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(block);
|
||||
} catch (Exception e) {
|
||||
return "Error converting block to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("New Ethereum Block: ");
|
||||
// Create SwapEnricher to add token addresses and metadata
|
||||
SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
|
||||
|
||||
transferStream
|
||||
// Enrich swap events with token addresses and metadata
|
||||
DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
|
||||
.unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
|
||||
.name("Swap Enricher");
|
||||
|
||||
// Print fully enriched swap events with all metadata
|
||||
fullyEnrichedSwapStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting transfer event to JSON: " + e.getMessage();
|
||||
return "Error converting enriched swap event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Transfer Event: ");
|
||||
|
||||
swapStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting swap event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Swap Event: ");
|
||||
*/
|
||||
|
||||
mintStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting mint event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Mint Event: ");
|
||||
|
||||
burnStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting burn event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Burn Event: ");
|
||||
|
||||
swapStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
} catch (Exception e) {
|
||||
return "Error converting swap event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Swap Event: ");
|
||||
.print("Fully Enriched Swap Event: ");
|
||||
|
||||
env.execute("Ethereum Block Stream");
|
||||
}
|
||||
|
||||
38
src/main/java/stream/dto/EnrichedSwapEventLog.java
Normal file
38
src/main/java/stream/dto/EnrichedSwapEventLog.java
Normal file
@@ -0,0 +1,38 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
|
||||
|
||||
// Token metadata for token0
|
||||
public Token token0Metadata;
|
||||
|
||||
// Token metadata for token1
|
||||
public Token token1Metadata;
|
||||
|
||||
public EnrichedSwapEventLog() {
|
||||
super();
|
||||
}
|
||||
|
||||
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
|
||||
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
|
||||
|
||||
// Copy all base swap event fields
|
||||
enriched.address = swapEvent.address;
|
||||
enriched.sender = swapEvent.sender;
|
||||
enriched.recipient = swapEvent.recipient;
|
||||
enriched.amount0 = swapEvent.amount0;
|
||||
enriched.amount1 = swapEvent.amount1;
|
||||
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
|
||||
enriched.liquidity = swapEvent.liquidity;
|
||||
enriched.tick = swapEvent.tick;
|
||||
enriched.token0 = swapEvent.token0;
|
||||
enriched.token1 = swapEvent.token1;
|
||||
|
||||
// Copy EventLog fields
|
||||
enriched.blockNumber = swapEvent.blockNumber;
|
||||
enriched.transactionHash = swapEvent.transactionHash;
|
||||
|
||||
return enriched;
|
||||
}
|
||||
}
|
||||
@@ -38,5 +38,4 @@ public class SwapEventLog extends EventLog implements Serializable {
|
||||
@BigInt
|
||||
public BigInteger liquidity;
|
||||
public int tick;
|
||||
|
||||
}
|
||||
@@ -6,7 +6,31 @@ public class Token extends AddressId {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
String name;
|
||||
String symbol;
|
||||
int decimals;
|
||||
private String name;
|
||||
private String symbol;
|
||||
private int decimals;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getSymbol() {
|
||||
return symbol;
|
||||
}
|
||||
|
||||
public void setSymbol(String symbol) {
|
||||
this.symbol = symbol;
|
||||
}
|
||||
|
||||
public int getDecimals() {
|
||||
return decimals;
|
||||
}
|
||||
|
||||
public void setDecimals(int decimals) {
|
||||
this.decimals = decimals;
|
||||
}
|
||||
}
|
||||
|
||||
52
src/main/java/stream/io/PoolTokenFetcher.java
Normal file
52
src/main/java/stream/io/PoolTokenFetcher.java
Normal file
@@ -0,0 +1,52 @@
|
||||
package stream.io;
|
||||
|
||||
public class PoolTokenFetcher {
|
||||
// Uniswap V3 pool method signatures
|
||||
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
|
||||
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
|
||||
|
||||
private final JsonRpcClient jsonRpcClient;
|
||||
|
||||
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
|
||||
this.jsonRpcClient = jsonRpcClient;
|
||||
}
|
||||
|
||||
public String fetchToken0(String poolAddress) throws Throwable {
|
||||
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
|
||||
Object[] params = new Object[]{request, "latest"};
|
||||
String result = jsonRpcClient.invoke("eth_call", params, String.class);
|
||||
return decodeAddress(result);
|
||||
}
|
||||
|
||||
public String fetchToken1(String poolAddress) throws Throwable {
|
||||
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
|
||||
Object[] params = new Object[]{request, "latest"};
|
||||
String result = jsonRpcClient.invoke("eth_call", params, String.class);
|
||||
return decodeAddress(result);
|
||||
}
|
||||
|
||||
private String decodeAddress(String hexResult) {
|
||||
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Address is the last 20 bytes (40 hex chars) of the result
|
||||
// Remove 0x prefix and get last 40 characters, then add 0x back
|
||||
String hex = hexResult.substring(2);
|
||||
if (hex.length() >= 40) {
|
||||
return "0x" + hex.substring(hex.length() - 40);
|
||||
}
|
||||
|
||||
return hexResult; // Return as-is if unexpected format
|
||||
}
|
||||
|
||||
private static class EthCallRequest {
|
||||
public final String to;
|
||||
public final String data;
|
||||
|
||||
public EthCallRequest(String to, String data) {
|
||||
this.to = to;
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
}
|
||||
161
src/main/java/stream/io/SwapEventEnricher.java
Normal file
161
src/main/java/stream/io/SwapEventEnricher.java
Normal file
@@ -0,0 +1,161 @@
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import stream.dto.EnrichedSwapEventLog;
|
||||
import stream.dto.SwapEventLog;
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
|
||||
|
||||
private transient JsonRpcClient jsonRpcClient;
|
||||
private transient PoolTokenFetcher poolTokenFetcher;
|
||||
private transient TokenMetadataFetcher tokenMetadataFetcher;
|
||||
private transient TokenCache tokenCache;
|
||||
private final String rpcUrl;
|
||||
|
||||
public SwapEventEnricher(String rpcUrl) {
|
||||
this.rpcUrl = rpcUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
|
||||
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
|
||||
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
|
||||
this.tokenMetadataFetcher.open(openContext);
|
||||
this.tokenCache = new TokenCache();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
|
||||
try {
|
||||
// First, get token addresses if not already populated
|
||||
CompletableFuture<Void> tokenAddressFuture;
|
||||
|
||||
if (swapEvent.token0 == null || swapEvent.token1 == null) {
|
||||
tokenAddressFuture = CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
if (swapEvent.token0 == null) {
|
||||
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
|
||||
}
|
||||
if (swapEvent.token1 == null) {
|
||||
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
tokenAddressFuture = CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
// Then fetch metadata for both tokens
|
||||
tokenAddressFuture.thenCompose(v -> {
|
||||
if (swapEvent.token0 == null || swapEvent.token1 == null) {
|
||||
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
|
||||
}
|
||||
|
||||
// Fetch metadata for both tokens concurrently (with caching)
|
||||
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
|
||||
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
|
||||
|
||||
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
|
||||
.thenApply(ignored -> {
|
||||
try {
|
||||
Token token0Meta = token0MetaFuture.get();
|
||||
Token token1Meta = token1MetaFuture.get();
|
||||
|
||||
// Create enriched event with token metadata
|
||||
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
|
||||
|
||||
// Set token metadata objects
|
||||
if (token0Meta != null) {
|
||||
enriched.token0Metadata = token0Meta;
|
||||
// Also set the address in the Token object
|
||||
token0Meta.address = swapEvent.token0;
|
||||
}
|
||||
|
||||
if (token1Meta != null) {
|
||||
enriched.token1Metadata = token1Meta;
|
||||
// Also set the address in the Token object
|
||||
token1Meta.address = swapEvent.token1;
|
||||
}
|
||||
|
||||
return enriched;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
})
|
||||
.thenAccept(enrichedEvent -> {
|
||||
resultFuture.complete(Collections.singleton(enrichedEvent));
|
||||
})
|
||||
.exceptionally(throwable -> {
|
||||
System.err.println("Error enriching swap event: " + throwable.getMessage());
|
||||
throwable.printStackTrace();
|
||||
resultFuture.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
|
||||
// Log cache stats periodically (every 100 events)
|
||||
if (System.currentTimeMillis() % 10000 < 100) {
|
||||
System.out.println("Token cache stats: " + tokenCache.getStats());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
resultFuture.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockResultFuture implements ResultFuture<Token> {
|
||||
private Token result;
|
||||
private Throwable error;
|
||||
private boolean complete = false;
|
||||
|
||||
@Override
|
||||
public void complete(java.util.Collection<Token> results) {
|
||||
if (!results.isEmpty()) {
|
||||
this.result = results.iterator().next();
|
||||
}
|
||||
this.complete = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
|
||||
try {
|
||||
java.util.Collection<Token> resultCollection = results.get();
|
||||
complete(resultCollection);
|
||||
} catch (Exception e) {
|
||||
completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionally(Throwable error) {
|
||||
this.error = error;
|
||||
this.complete = true;
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return complete;
|
||||
}
|
||||
|
||||
public boolean hasError() {
|
||||
return error != null;
|
||||
}
|
||||
|
||||
public Token getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public Throwable getError() {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
}
|
||||
142
src/main/java/stream/io/TokenCache.java
Normal file
142
src/main/java/stream/io/TokenCache.java
Normal file
@@ -0,0 +1,142 @@
|
||||
package stream.io;
|
||||
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Thread-safe cache for token metadata to avoid repeated RPC calls
|
||||
* for the same token addresses.
|
||||
*/
|
||||
public class TokenCache {
|
||||
|
||||
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Get token from cache if present, otherwise return null
|
||||
*/
|
||||
public Token get(String tokenAddress) {
|
||||
return cache.get(tokenAddress.toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Put token in cache
|
||||
*/
|
||||
public void put(String tokenAddress, Token token) {
|
||||
if (token != null && tokenAddress != null) {
|
||||
cache.put(tokenAddress.toLowerCase(), token);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if token is already in cache
|
||||
*/
|
||||
public boolean contains(String tokenAddress) {
|
||||
return cache.containsKey(tokenAddress.toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or fetch token metadata with deduplication.
|
||||
* If multiple requests come for the same token simultaneously,
|
||||
* only one will fetch and others will wait for the result.
|
||||
*/
|
||||
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
|
||||
String normalizedAddress = tokenAddress.toLowerCase();
|
||||
|
||||
// Check cache first
|
||||
Token cachedToken = cache.get(normalizedAddress);
|
||||
if (cachedToken != null) {
|
||||
return CompletableFuture.completedFuture(cachedToken);
|
||||
}
|
||||
|
||||
// Check if there's already a pending request for this token
|
||||
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
|
||||
if (pendingFuture != null) {
|
||||
return pendingFuture;
|
||||
}
|
||||
|
||||
// Create new request
|
||||
CompletableFuture<Token> future = new CompletableFuture<>();
|
||||
|
||||
// Register the pending request
|
||||
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
|
||||
if (existingFuture != null) {
|
||||
// Another thread beat us to it, return their future
|
||||
return existingFuture;
|
||||
}
|
||||
|
||||
// We're the first, so fetch the token metadata
|
||||
try {
|
||||
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
|
||||
@Override
|
||||
public void complete(java.util.Collection<Token> results) {
|
||||
Token token = results.isEmpty() ? null : results.iterator().next();
|
||||
|
||||
// Cache the result (even if null to avoid repeated failures)
|
||||
if (token != null) {
|
||||
cache.put(normalizedAddress, token);
|
||||
}
|
||||
|
||||
// Complete the future
|
||||
future.complete(token);
|
||||
|
||||
// Remove from pending requests
|
||||
pendingRequests.remove(normalizedAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
|
||||
try {
|
||||
java.util.Collection<Token> resultCollection = results.get();
|
||||
complete(resultCollection);
|
||||
} catch (Exception e) {
|
||||
completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionally(Throwable error) {
|
||||
future.completeExceptionally(error);
|
||||
pendingRequests.remove(normalizedAddress);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
pendingRequests.remove(normalizedAddress);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cache statistics for monitoring
|
||||
*/
|
||||
public CacheStats getStats() {
|
||||
return new CacheStats(cache.size(), pendingRequests.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all cached entries (useful for testing or memory management)
|
||||
*/
|
||||
public void clear() {
|
||||
cache.clear();
|
||||
// Note: We don't clear pending requests as they're in flight
|
||||
}
|
||||
|
||||
public static class CacheStats {
|
||||
public final int cachedTokens;
|
||||
public final int pendingRequests;
|
||||
|
||||
public CacheStats(int cachedTokens, int pendingRequests) {
|
||||
this.cachedTokens = cachedTokens;
|
||||
this.pendingRequests = pendingRequests;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
|
||||
}
|
||||
}
|
||||
}
|
||||
166
src/main/java/stream/io/TokenMetadataFetcher.java
Normal file
166
src/main/java/stream/io/TokenMetadataFetcher.java
Normal file
@@ -0,0 +1,166 @@
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
|
||||
|
||||
// ERC-20 method signatures
|
||||
private static final String NAME_SIGNATURE = "0x06fdde03";
|
||||
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
|
||||
private static final String DECIMALS_SIGNATURE = "0x313ce567";
|
||||
private static final String LATEST_BLOCK = "latest";
|
||||
|
||||
private transient JsonRpcClient jsonRpcClient;
|
||||
private final String rpcUrl;
|
||||
|
||||
public TokenMetadataFetcher(String rpcUrl) {
|
||||
this.rpcUrl = rpcUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
|
||||
try {
|
||||
// Fetch all metadata concurrently
|
||||
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return fetchName(tokenAddress);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return fetchSymbol(tokenAddress);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return fetchDecimals(tokenAddress);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
// Combine all results
|
||||
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
|
||||
.thenAccept(v -> {
|
||||
try {
|
||||
Token token = new Token();
|
||||
token.setName(decodeString(nameFuture.get()));
|
||||
token.setSymbol(decodeString(symbolFuture.get()));
|
||||
token.setDecimals(decodeUint8(decimalsFuture.get()));
|
||||
|
||||
resultFuture.complete(Collections.singleton(token));
|
||||
} catch (Exception e) {
|
||||
resultFuture.completeExceptionally(e);
|
||||
}
|
||||
})
|
||||
.exceptionally(throwable -> {
|
||||
resultFuture.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
resultFuture.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String fetchName(String tokenAddress) throws Throwable {
|
||||
return makeEthCall(tokenAddress, NAME_SIGNATURE);
|
||||
}
|
||||
|
||||
private String fetchSymbol(String tokenAddress) throws Throwable {
|
||||
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
|
||||
}
|
||||
|
||||
private String fetchDecimals(String tokenAddress) throws Throwable {
|
||||
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
|
||||
}
|
||||
|
||||
private String makeEthCall(String toAddress, String data) throws Throwable {
|
||||
EthCallRequest request = new EthCallRequest(toAddress, data);
|
||||
Object[] params = new Object[]{request, LATEST_BLOCK};
|
||||
return jsonRpcClient.invoke("eth_call", params, String.class);
|
||||
}
|
||||
|
||||
private String decodeString(String hexData) {
|
||||
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
try {
|
||||
// Remove 0x prefix
|
||||
String hex = hexData.substring(2);
|
||||
|
||||
// Skip the first 32 bytes (offset pointer)
|
||||
// The actual string length is at bytes 32-64
|
||||
String lengthHex = hex.substring(64, 128);
|
||||
int length = Integer.parseInt(lengthHex, 16);
|
||||
|
||||
if (length <= 0 || length > 100) { // Sanity check
|
||||
return "Invalid";
|
||||
}
|
||||
|
||||
// Extract the actual string data starting at byte 64
|
||||
String dataHex = hex.substring(128, 128 + (length * 2));
|
||||
|
||||
// Convert hex to string
|
||||
StringBuilder result = new StringBuilder();
|
||||
for (int i = 0; i < dataHex.length(); i += 2) {
|
||||
String hexChar = dataHex.substring(i, i + 2);
|
||||
int charCode = Integer.parseInt(hexChar, 16);
|
||||
if (charCode > 0) { // Skip null bytes
|
||||
result.append((char) charCode);
|
||||
}
|
||||
}
|
||||
|
||||
return result.toString();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
|
||||
return "DecodeError";
|
||||
}
|
||||
}
|
||||
|
||||
private int decodeUint8(String hexData) {
|
||||
if (hexData == null || hexData.equals("0x")) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
// Remove 0x prefix and get last byte (uint8)
|
||||
String hex = hexData.substring(2);
|
||||
// uint8 is the last 2 hex characters (1 byte)
|
||||
String uint8Hex = hex.substring(hex.length() - 2);
|
||||
return Integer.parseInt(uint8Hex, 16);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static class EthCallRequest {
|
||||
public final String to;
|
||||
public final String data;
|
||||
|
||||
public EthCallRequest(String to, String data) {
|
||||
this.to = to;
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user