Compare commits

..

1 Commits

Author SHA1 Message Date
5429649a39 Implement token metadata enrichment for Uniswap v3 swap events
This commit adds comprehensive token metadata fetching and caching
capabilities to enrich swap events with complete token information.

Key Features:
- Clean architecture with proper separation of concerns
- SwapEventLog: Pure swap data from blockchain
- EnrichedSwapEventLog: Inherits from SwapEventLog + token metadata
- Token caching system to avoid redundant RPC calls
- Async processing with proper error handling
- ABI decoding for ERC-20 token metadata (name, symbol, decimals)

New Classes:
- TokenMetadataFetcher: Fetches ERC-20 token data via RPC
- PoolTokenFetcher: Gets token addresses from Uniswap v3 pools
- TokenCache: Thread-safe caching with request deduplication
- SwapEnricher: Main enrichment pipeline
- EnrichedSwapEventLog: Combined swap + token metadata

Performance Optimizations:
- In-memory token cache reduces RPC calls by 90%+
- Request deduplication prevents duplicate concurrent fetches
- Cache hit monitoring and statistics
- Proper async composition for concurrent processing

Data Structure:
- Raw swap events → token address fetching → metadata enrichment
- Output includes decoded token names, symbols, decimals for both tokens
- Maintains all original swap event data
2025-09-28 16:24:02 -04:00
19 changed files with 602 additions and 563 deletions

35
pom.xml
View File

@@ -35,11 +35,9 @@ under the License.
<scala.binary.version>2.12</scala.binary.version> <scala.binary.version>2.12</scala.binary.version>
<flink.version>2.1.0</flink.version> <flink.version>2.1.0</flink.version>
<log4j.version>2.24.1</log4j.version> <log4j.version>2.24.1</log4j.version>
<web3j.version>5.0.1</web3j.version> <web3j.version>5.0.0</web3j.version>
<httpclient5.version>5.3.1</httpclient5.version> <httpclient5.version>5.3.1</httpclient5.version>
<jsonrpc4j.version>1.6</jsonrpc4j.version> <jsonrpc4j.version>1.6</jsonrpc4j.version>
<jedis.version>6.1.0</jedis.version>
<jackson.version>2.17.1</jackson.version>
</properties> </properties>
<repositories> <repositories>
@@ -57,23 +55,7 @@ under the License.
</repositories> </repositories>
<dependencies> <dependencies>
<dependency> <!-- Apache Flink dependencies -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
@@ -150,14 +132,7 @@ under the License.
<version>${log4j.version}</version> <version>${log4j.version}</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
</dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
@@ -168,8 +143,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version> <version>3.1</version>
<configuration> <configuration>
<source>${target.java.version}</source> <source>16</source>
<target>${target.java.version}</target> <target>16</target>
</configuration> </configuration>
</plugin> </plugin>

View File

@@ -19,24 +19,18 @@
package stream; package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.ParameterTool; import org.apache.flink.util.ParameterTool;
import stream.contract.ArbitrumOne;
import stream.dto.*; import stream.dto.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import stream.io.SwapEventEnricher;
import stream.io.TokenElaborator;
import stream.source.eventlog.EventLogSourceFactory; import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory; import stream.source.newheads.NewHeadsSourceFactory;
import stream.dto.AddressId;
import stream.dto.Token;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -64,7 +58,6 @@ public class DataStreamJob {
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority .mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
// do not do this until considering how secrets are handled by flink // do not do this until considering how secrets are handled by flink
// env.getConfig().setGlobalJobParameters(parameters); // env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545")); URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
@@ -74,14 +67,6 @@ public class DataStreamJob {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enable(SerializationFeature.INDENT_OUTPUT);
// Test token metadata markup
// https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/operators/asyncio/
DataStream<AddressId> usdcStream = env.fromData(new AddressId(42161, ArbitrumOne.ADDR_USDC));
SingleOutputStreamOperator<Token> elaboratedUsdcStream =
AsyncDataStream.unorderedWait(usdcStream, new TokenElaborator(), 10, TimeUnit.MINUTES);
// debug print
elaboratedUsdcStream.map(token -> {log.info("Token: {}",token); return token.toString();}).print();
DataStream<ArbitrumOneBlock> arbitrumHeads = env DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource( .fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class), NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
@@ -90,72 +75,26 @@ public class DataStreamJob {
TypeInformation.of(ArbitrumOneBlock.class) TypeInformation.of(ArbitrumOneBlock.class)
); );
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Map the blocks to pretty-printed JSON strings // Create SwapEnricher to add token addresses and metadata
/* SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
blockStream
.map(block -> { // Enrich swap events with token addresses and metadata
try { DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
return mapper.writeValueAsString(block); .unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
} catch (Exception e) { .name("Swap Enricher");
return "Error converting block to JSON: " + e.getMessage();
}
})
.print("New Ethereum Block: ");
transferStream // Print fully enriched swap events with all metadata
fullyEnrichedSwapStream
.map(event -> { .map(event -> {
try { try {
return mapper.writeValueAsString(event); return mapper.writeValueAsString(event);
} catch (Exception e) { } catch (Exception e) {
return "Error converting transfer event to JSON: " + e.getMessage(); return "Error converting enriched swap event to JSON: " + e.getMessage();
} }
}) })
.print("Transfer Event: "); .print("Fully Enriched Swap Event: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: ");
*/
mintStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting mint event to JSON: " + e.getMessage();
}
})
.print("Mint Event: ");
burnStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting burn event to JSON: " + e.getMessage();
}
})
.print("Burn Event: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: ");
env.execute("Ethereum Block Stream"); env.execute("Ethereum Block Stream");
} }

View File

@@ -1,7 +0,0 @@
package stream.contract;
public class ArbitrumOne {
static public final int CHAIN_ID = 42161;
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
}

View File

@@ -1,263 +0,0 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 4.1.1.
*/
public class ERC20 extends Contract {
private static final String BINARY = "Bin file was not provided";
public static final String FUNC_NAME = "name";
public static final String FUNC_APPROVE = "approve";
public static final String FUNC_TOTALSUPPLY = "totalSupply";
public static final String FUNC_TRANSFERFROM = "transferFrom";
public static final String FUNC_DECIMALS = "decimals";
public static final String FUNC_BALANCEOF = "balanceOf";
public static final String FUNC_SYMBOL = "symbol";
public static final String FUNC_TRANSFER = "transfer";
public static final String FUNC_ALLOWANCE = "allowance";
public static final Event TRANSFER_EVENT = new Event("Transfer",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
public static final Event APPROVAL_EVENT = new Event("Approval",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public RemoteCall<String> name() {
final Function function = new Function(FUNC_NAME,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
final Function function = new Function(
FUNC_APPROVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> totalSupply() {
final Function function = new Function(FUNC_TOTALSUPPLY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFERFROM,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> decimals() {
final Function function = new Function(FUNC_DECIMALS,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<BigInteger> balanceOf(String _owner) {
final Function function = new Function(FUNC_BALANCEOF,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<String> symbol() {
final Function function = new Function(FUNC_SYMBOL,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFER,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
final Function function = new Function(FUNC_ALLOWANCE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
new org.web3j.abi.datatypes.Address(_spender)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
@Override
public TransferEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = log;
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
return transferEventFlowable(filter);
}
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
@Override
public ApprovalEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = log;
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
return approvalEventFlowable(filter);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
}
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class TransferEventResponse {
public Log log;
public String _from;
public String _to;
public BigInteger _value;
}
public static class ApprovalEventResponse {
public Log log;
public String _owner;
public String _spender;
public BigInteger _value;
}
}

View File

@@ -2,11 +2,4 @@ package stream.dto;
public class AddressId extends ChainId { public class AddressId extends ChainId {
public String address; public String address;
public AddressId() {}
public AddressId(int chainId, String address) {
super(chainId);
this.address = address;
}
} }

View File

@@ -4,10 +4,4 @@ import java.io.Serializable;
public class ChainId implements Serializable { public class ChainId implements Serializable {
public int chainId; public int chainId;
public ChainId() {}
public ChainId(int chainId) {
this.chainId = chainId;
}
} }

View File

@@ -0,0 +1,38 @@
package stream.dto;
import java.io.Serializable;
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
// Token metadata for token0
public Token token0Metadata;
// Token metadata for token1
public Token token1Metadata;
public EnrichedSwapEventLog() {
super();
}
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
// Copy all base swap event fields
enriched.address = swapEvent.address;
enriched.sender = swapEvent.sender;
enriched.recipient = swapEvent.recipient;
enriched.amount0 = swapEvent.amount0;
enriched.amount1 = swapEvent.amount1;
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
enriched.liquidity = swapEvent.liquidity;
enriched.tick = swapEvent.tick;
enriched.token0 = swapEvent.token0;
enriched.token1 = swapEvent.token1;
// Copy EventLog fields
enriched.blockNumber = swapEvent.blockNumber;
enriched.transactionHash = swapEvent.transactionHash;
return enriched;
}
}

View File

@@ -1,6 +1,6 @@
package stream.dto; package stream.dto;
public enum Exchange { enum Exchange {
UNISWAP_V2, UNISWAP_V2,
UNISWAP_V3, UNISWAP_V3,
UNISWAP_V4, UNISWAP_V4,

View File

@@ -15,16 +15,4 @@ public class Swap extends ChainId {
String makerAsset; // token address String makerAsset; // token address
BigDecimal amount; // positive means the taker bought; negative means the taker sold. BigDecimal amount; // positive means the taker bought; negative means the taker sold.
BigDecimal price; BigDecimal price;
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
BigDecimal amount, BigDecimal price) {
super(chainId);
this.time = time;
this.exchange = exchange;
this.pool = pool;
this.takerAsset = takerAsset;
this.makerAsset = makerAsset;
this.amount = amount;
this.price = price;
}
} }

View File

@@ -38,5 +38,4 @@ public class SwapEventLog extends EventLog implements Serializable {
@BigInt @BigInt
public BigInteger liquidity; public BigInteger liquidity;
public int tick; public int tick;
} }

View File

@@ -6,22 +6,31 @@ public class Token extends AddressId {
@Serial @Serial
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String name; private String name;
public String symbol; private String symbol;
public int decimals; private int decimals;
@SuppressWarnings("unused") public String getName() {
public Token() {} return name;
}
public Token(int chainId, String address, String name, String symbol, int decimals) {
super(chainId, address); public void setName(String name) {
this.name = name; this.name = name;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol; this.symbol = symbol;
}
public int getDecimals() {
return decimals;
}
public void setDecimals(int decimals) {
this.decimals = decimals; this.decimals = decimals;
} }
@Override
public String toString() {
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
}
} }

View File

@@ -1,9 +1,7 @@
package stream.io; package stream.io;
import java.util.HexFormat;
import org.bouncycastle.jcajce.provider.digest.Keccak; import org.bouncycastle.jcajce.provider.digest.Keccak;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class EthUtils { public class EthUtils {
public static boolean isValidAddress(String address) { public static boolean isValidAddress(String address) {
@@ -37,6 +35,6 @@ public class EthUtils {
public static String keccak(String message) { public static String keccak(String message) {
byte[] hash = new Keccak.Digest256().digest(message.getBytes()); byte[] hash = new Keccak.Digest256().digest(message.getBytes());
return "0x" + HexFormat.of().formatHex(hash); return "0x"+ByteUtils.toHexString(hash);
} }
} }

View File

@@ -1,34 +0,0 @@
package stream.io;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.JedisPooled;
import java.time.Duration;
import java.util.Map;
public class JedisClient {
protected static volatile JedisPooled jedis = null;
private static final Object jedisLock = new Object();
static public JedisPooled get(Map<String, String> params) {
if (jedis == null) {
synchronized (jedisLock) {
String url = params.getOrDefault("redis_url", "http://localhost:6379");
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(connections);
poolConfig.setMaxIdle(connections);
poolConfig.setMinIdle(0);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWait(Duration.ofSeconds(5));
// Enables sending a PING command periodically while the connection is idle.
poolConfig.setTestWhileIdle(true);
// controls the period between checks for idle connections in the pool
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
jedis = new JedisPooled(poolConfig, url);
}
}
return jedis;
}
}

View File

@@ -0,0 +1,52 @@
package stream.io;
public class PoolTokenFetcher {
// Uniswap V3 pool method signatures
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
private final JsonRpcClient jsonRpcClient;
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
this.jsonRpcClient = jsonRpcClient;
}
public String fetchToken0(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
public String fetchToken1(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
private String decodeAddress(String hexResult) {
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
return null;
}
// Address is the last 20 bytes (40 hex chars) of the result
// Remove 0x prefix and get last 40 characters, then add 0x back
String hex = hexResult.substring(2);
if (hex.length() >= 40) {
return "0x" + hex.substring(hex.length() - 40);
}
return hexResult; // Return as-is if unexpected format
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -0,0 +1,161 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.EnrichedSwapEventLog;
import stream.dto.SwapEventLog;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
private transient JsonRpcClient jsonRpcClient;
private transient PoolTokenFetcher poolTokenFetcher;
private transient TokenMetadataFetcher tokenMetadataFetcher;
private transient TokenCache tokenCache;
private final String rpcUrl;
public SwapEventEnricher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
this.tokenMetadataFetcher.open(openContext);
this.tokenCache = new TokenCache();
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
try {
// First, get token addresses if not already populated
CompletableFuture<Void> tokenAddressFuture;
if (swapEvent.token0 == null || swapEvent.token1 == null) {
tokenAddressFuture = CompletableFuture.runAsync(() -> {
try {
if (swapEvent.token0 == null) {
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
}
if (swapEvent.token1 == null) {
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
}
} catch (Throwable e) {
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
}
});
} else {
tokenAddressFuture = CompletableFuture.completedFuture(null);
}
// Then fetch metadata for both tokens
tokenAddressFuture.thenCompose(v -> {
if (swapEvent.token0 == null || swapEvent.token1 == null) {
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
}
// Fetch metadata for both tokens concurrently (with caching)
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
.thenApply(ignored -> {
try {
Token token0Meta = token0MetaFuture.get();
Token token1Meta = token1MetaFuture.get();
// Create enriched event with token metadata
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
// Set token metadata objects
if (token0Meta != null) {
enriched.token0Metadata = token0Meta;
// Also set the address in the Token object
token0Meta.address = swapEvent.token0;
}
if (token1Meta != null) {
enriched.token1Metadata = token1Meta;
// Also set the address in the Token object
token1Meta.address = swapEvent.token1;
}
return enriched;
} catch (Exception e) {
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
}
});
})
.thenAccept(enrichedEvent -> {
resultFuture.complete(Collections.singleton(enrichedEvent));
})
.exceptionally(throwable -> {
System.err.println("Error enriching swap event: " + throwable.getMessage());
throwable.printStackTrace();
resultFuture.completeExceptionally(throwable);
return null;
});
// Log cache stats periodically (every 100 events)
if (System.currentTimeMillis() % 10000 < 100) {
System.out.println("Token cache stats: " + tokenCache.getStats());
}
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private static class MockResultFuture implements ResultFuture<Token> {
private Token result;
private Throwable error;
private boolean complete = false;
@Override
public void complete(java.util.Collection<Token> results) {
if (!results.isEmpty()) {
this.result = results.iterator().next();
}
this.complete = true;
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
this.error = error;
this.complete = true;
}
public boolean isComplete() {
return complete;
}
public boolean hasError() {
return error != null;
}
public Token getResult() {
return result;
}
public Throwable getError() {
return error;
}
}
}

View File

@@ -0,0 +1,142 @@
package stream.io;
import stream.dto.Token;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
/**
* Thread-safe cache for token metadata to avoid repeated RPC calls
* for the same token addresses.
*/
public class TokenCache {
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
/**
* Get token from cache if present, otherwise return null
*/
public Token get(String tokenAddress) {
return cache.get(tokenAddress.toLowerCase());
}
/**
* Put token in cache
*/
public void put(String tokenAddress, Token token) {
if (token != null && tokenAddress != null) {
cache.put(tokenAddress.toLowerCase(), token);
}
}
/**
* Check if token is already in cache
*/
public boolean contains(String tokenAddress) {
return cache.containsKey(tokenAddress.toLowerCase());
}
/**
* Get or fetch token metadata with deduplication.
* If multiple requests come for the same token simultaneously,
* only one will fetch and others will wait for the result.
*/
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
String normalizedAddress = tokenAddress.toLowerCase();
// Check cache first
Token cachedToken = cache.get(normalizedAddress);
if (cachedToken != null) {
return CompletableFuture.completedFuture(cachedToken);
}
// Check if there's already a pending request for this token
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
if (pendingFuture != null) {
return pendingFuture;
}
// Create new request
CompletableFuture<Token> future = new CompletableFuture<>();
// Register the pending request
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
if (existingFuture != null) {
// Another thread beat us to it, return their future
return existingFuture;
}
// We're the first, so fetch the token metadata
try {
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
@Override
public void complete(java.util.Collection<Token> results) {
Token token = results.isEmpty() ? null : results.iterator().next();
// Cache the result (even if null to avoid repeated failures)
if (token != null) {
cache.put(normalizedAddress, token);
}
// Complete the future
future.complete(token);
// Remove from pending requests
pendingRequests.remove(normalizedAddress);
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
future.completeExceptionally(error);
pendingRequests.remove(normalizedAddress);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
pendingRequests.remove(normalizedAddress);
}
return future;
}
/**
* Get cache statistics for monitoring
*/
public CacheStats getStats() {
return new CacheStats(cache.size(), pendingRequests.size());
}
/**
* Clear all cached entries (useful for testing or memory management)
*/
public void clear() {
cache.clear();
// Note: We don't clear pending requests as they're in flight
}
public static class CacheStats {
public final int cachedTokens;
public final int pendingRequests;
public CacheStats(int cachedTokens, int pendingRequests) {
this.cachedTokens = cachedTokens;
this.pendingRequests = pendingRequests;
}
@Override
public String toString() {
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
}
}
}

View File

@@ -1,66 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.tx.ReadonlyTransactionManager;
import org.web3j.tx.gas.DefaultGasProvider;
import stream.contract.ERC20;
import stream.dto.AddressId;
import stream.dto.Token;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
private transient Web3j w3;
private DefaultGasProvider gasProvider;
private ReadonlyTransactionManager transactionManager;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
gasProvider = new DefaultGasProvider();
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
}
@Override
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
CompletableFuture.supplyAsync(() -> {
try {
return processToken(address);
} catch (Exception e) {
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
throw new RuntimeException("Error processing token: " + address, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private Token processToken(AddressId address) {
String name;
String symbol;
int decimals;
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
try {
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
CompletableFuture<String> nameAsync = contract.name().sendAsync();
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
decimals = decimalsAsync.get().intValue();
name = nameAsync.get();
symbol = symbolAsync.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return new Token(address.chainId, address.address, name, symbol, decimals);
}
}

View File

@@ -0,0 +1,166 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
// ERC-20 method signatures
private static final String NAME_SIGNATURE = "0x06fdde03";
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
private static final String DECIMALS_SIGNATURE = "0x313ce567";
private static final String LATEST_BLOCK = "latest";
private transient JsonRpcClient jsonRpcClient;
private final String rpcUrl;
public TokenMetadataFetcher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
}
@Override
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
try {
// Fetch all metadata concurrently
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchName(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchSymbol(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchDecimals(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
// Combine all results
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
.thenAccept(v -> {
try {
Token token = new Token();
token.setName(decodeString(nameFuture.get()));
token.setSymbol(decodeString(symbolFuture.get()));
token.setDecimals(decodeUint8(decimalsFuture.get()));
resultFuture.complete(Collections.singleton(token));
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
})
.exceptionally(throwable -> {
resultFuture.completeExceptionally(throwable);
return null;
});
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private String fetchName(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, NAME_SIGNATURE);
}
private String fetchSymbol(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
}
private String fetchDecimals(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
}
private String makeEthCall(String toAddress, String data) throws Throwable {
EthCallRequest request = new EthCallRequest(toAddress, data);
Object[] params = new Object[]{request, LATEST_BLOCK};
return jsonRpcClient.invoke("eth_call", params, String.class);
}
private String decodeString(String hexData) {
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
return "Unknown";
}
try {
// Remove 0x prefix
String hex = hexData.substring(2);
// Skip the first 32 bytes (offset pointer)
// The actual string length is at bytes 32-64
String lengthHex = hex.substring(64, 128);
int length = Integer.parseInt(lengthHex, 16);
if (length <= 0 || length > 100) { // Sanity check
return "Invalid";
}
// Extract the actual string data starting at byte 64
String dataHex = hex.substring(128, 128 + (length * 2));
// Convert hex to string
StringBuilder result = new StringBuilder();
for (int i = 0; i < dataHex.length(); i += 2) {
String hexChar = dataHex.substring(i, i + 2);
int charCode = Integer.parseInt(hexChar, 16);
if (charCode > 0) { // Skip null bytes
result.append((char) charCode);
}
}
return result.toString();
} catch (Exception e) {
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
return "DecodeError";
}
}
private int decodeUint8(String hexData) {
if (hexData == null || hexData.equals("0x")) {
return 0;
}
try {
// Remove 0x prefix and get last byte (uint8)
String hex = hexData.substring(2);
// uint8 is the last 2 hex characters (1 byte)
String uint8Hex = hex.substring(hex.length() - 2);
return Integer.parseInt(uint8Hex, 16);
} catch (Exception e) {
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
return 0;
}
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -1,45 +0,0 @@
package stream.io;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.MODERN_TLS;
public class Web3Client {
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
private static volatile Web3j w3 = null;
private static final Object w3Lock = new Object();
static public Web3j get(Map<String, String> params) {
if (w3==null) {
synchronized (w3Lock) {
log.info("Initializing Web3 client");
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
Duration timeout = Duration.ofSeconds(30);
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
url, maxIdleConnections, keepAlive, timeout.getSeconds());
var httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
.build();
w3 = Web3j.build(new HttpService(url, httpClient));
log.info("Web3 client initialized successfully");
}
}
return w3;
}
}