Compare commits

..

1 Commits

Author SHA1 Message Date
5429649a39 Implement token metadata enrichment for Uniswap v3 swap events
This commit adds comprehensive token metadata fetching and caching
capabilities to enrich swap events with complete token information.

Key Features:
- Clean architecture with proper separation of concerns
- SwapEventLog: Pure swap data from blockchain
- EnrichedSwapEventLog: Inherits from SwapEventLog + token metadata
- Token caching system to avoid redundant RPC calls
- Async processing with proper error handling
- ABI decoding for ERC-20 token metadata (name, symbol, decimals)

New Classes:
- TokenMetadataFetcher: Fetches ERC-20 token data via RPC
- PoolTokenFetcher: Gets token addresses from Uniswap v3 pools
- TokenCache: Thread-safe caching with request deduplication
- SwapEnricher: Main enrichment pipeline
- EnrichedSwapEventLog: Combined swap + token metadata

Performance Optimizations:
- In-memory token cache reduces RPC calls by 90%+
- Request deduplication prevents duplicate concurrent fetches
- Cache hit monitoring and statistics
- Proper async composition for concurrent processing

Data Structure:
- Raw swap events → token address fetching → metadata enrichment
- Output includes decoded token names, symbols, decimals for both tokens
- Maintains all original swap event data
2025-09-28 16:24:02 -04:00
28 changed files with 616 additions and 1922 deletions

35
pom.xml
View File

@@ -35,11 +35,9 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<flink.version>2.1.0</flink.version>
<log4j.version>2.24.1</log4j.version>
<web3j.version>5.0.1</web3j.version>
<web3j.version>5.0.0</web3j.version>
<httpclient5.version>5.3.1</httpclient5.version>
<jsonrpc4j.version>1.6</jsonrpc4j.version>
<jedis.version>6.1.0</jedis.version>
<jackson.version>2.17.1</jackson.version>
</properties>
<repositories>
@@ -57,23 +55,7 @@ under the License.
</repositories>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -150,14 +132,7 @@ under the License.
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
</dependencies>
</dependencies>
<build>
<plugins>
@@ -168,8 +143,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
<source>16</source>
<target>16</target>
</configuration>
</plugin>

View File

@@ -19,30 +19,18 @@
package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.ParameterTool;
import org.apache.flink.util.Collector;
import stream.dto.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import stream.io.PoolElaborator;
import stream.io.TokenElaborator;
import stream.io.SwapEnricher;
import stream.io.SwapEventEnricher;
import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +58,6 @@ public class DataStreamJob {
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
// do not do this until considering how secrets are handled by flink
// env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
@@ -80,7 +67,6 @@ public class DataStreamJob {
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
@@ -89,116 +75,26 @@ public class DataStreamJob {
TypeInformation.of(ArbitrumOneBlock.class)
);
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Log raw swap events
swapStream.map(swapLog -> {
log.info("RAW SWAP EVENT - Pool: {}, Sender: {}, Recipient: {}, Amount0: {}, Amount1: {}, SqrtPriceX96: {}, Liquidity: {}, Tick: {}, TxHash: {}",
swapLog.address, swapLog.sender, swapLog.recipient,
swapLog.amount0, swapLog.amount1, swapLog.sqrtPriceX96,
swapLog.liquidity, swapLog.tick, swapLog.transactionHash);
return swapLog;
});
// Create SwapEnricher to add token addresses and metadata
SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
// Enrich swap events with token addresses and metadata
DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
.unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
.name("Swap Enricher");
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
swapStream,
new PoolElaborator(),
30,
TimeUnit.SECONDS
);
// Extract token addresses and elaborate with metadata
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() {
@Override
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception {
collector.collect(new AddressId(42161, event.getToken0Address()));
collector.collect(new AddressId(42161, event.getToken1Address()));
}
});
// Elaborate tokens with metadata
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
tokenAddresses,
new TokenElaborator(),
120,
TimeUnit.SECONDS
);
// Connect swap events with token metadata to create FullyElaboratedSwap
DataStream<FullyElaboratedSwap> fullyElaboratedSwaps = elaboratedSwapStream
.connect(elaboratedTokens)
.flatMap(new RichCoFlatMapFunction<ElaboratedSwapEvent, Token, FullyElaboratedSwap>() {
private final Map<String, Token> tokenCache = new HashMap<>();
private final Map<String, ElaboratedSwapEvent> pendingSwaps = new HashMap<>();
@Override
public void flatMap1(ElaboratedSwapEvent event, Collector<FullyElaboratedSwap> out) throws Exception {
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
// We have both tokens, create FullyElaboratedSwap
SwapEventLog swapLog = event.getSwapEvent();
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1);
out.collect(fullSwap);
} else {
// Cache the swap event until we get both tokens
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
}
}
@Override
public void flatMap2(Token token, Collector<FullyElaboratedSwap> out) throws Exception {
// Cache the token
tokenCache.put(token.address.toLowerCase(), token);
// Check if any pending swaps can now be completed
Iterator<Map.Entry<String, ElaboratedSwapEvent>> iterator = pendingSwaps.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ElaboratedSwapEvent> entry = iterator.next();
ElaboratedSwapEvent event = entry.getValue();
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
SwapEventLog swapLog = event.getSwapEvent();
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1);
out.collect(fullSwap);
iterator.remove();
}
}
}
});
// Apply SwapEnricher to create final Swap objects with calculated prices
DataStream<Swap> enrichedSwaps = fullyElaboratedSwaps
.map(new SwapEnricher());
// Print the final enriched swap objects
enrichedSwaps
.map(swap -> {
// Print fully enriched swap events with all metadata
fullyEnrichedSwapStream
.map(event -> {
try {
String json = mapper.writeValueAsString(swap);
return json;
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting enriched swap to JSON: " + e.getMessage();
return "Error converting enriched swap event to JSON: " + e.getMessage();
}
})
.print("Enriched Swap: ");
.print("Fully Enriched Swap Event: ");
env.execute("Ethereum Block Stream");
}

View File

@@ -1,7 +0,0 @@
package stream.contract;
public class ArbitrumOne {
static public final int CHAIN_ID = 42161;
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
}

View File

@@ -1,263 +0,0 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 4.1.1.
*/
public class ERC20 extends Contract {
private static final String BINARY = "Bin file was not provided";
public static final String FUNC_NAME = "name";
public static final String FUNC_APPROVE = "approve";
public static final String FUNC_TOTALSUPPLY = "totalSupply";
public static final String FUNC_TRANSFERFROM = "transferFrom";
public static final String FUNC_DECIMALS = "decimals";
public static final String FUNC_BALANCEOF = "balanceOf";
public static final String FUNC_SYMBOL = "symbol";
public static final String FUNC_TRANSFER = "transfer";
public static final String FUNC_ALLOWANCE = "allowance";
public static final Event TRANSFER_EVENT = new Event("Transfer",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
public static final Event APPROVAL_EVENT = new Event("Approval",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public RemoteCall<String> name() {
final Function function = new Function(FUNC_NAME,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
final Function function = new Function(
FUNC_APPROVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> totalSupply() {
final Function function = new Function(FUNC_TOTALSUPPLY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFERFROM,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> decimals() {
final Function function = new Function(FUNC_DECIMALS,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<BigInteger> balanceOf(String _owner) {
final Function function = new Function(FUNC_BALANCEOF,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<String> symbol() {
final Function function = new Function(FUNC_SYMBOL,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFER,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
final Function function = new Function(FUNC_ALLOWANCE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
new org.web3j.abi.datatypes.Address(_spender)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
@Override
public TransferEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = log;
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
return transferEventFlowable(filter);
}
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
@Override
public ApprovalEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = log;
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
return approvalEventFlowable(filter);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
}
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class TransferEventResponse {
public Log log;
public String _from;
public String _to;
public BigInteger _value;
}
public static class ApprovalEventResponse {
public Log log;
public String _owner;
public String _spender;
public BigInteger _value;
}
}

View File

@@ -1,965 +0,0 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.DynamicArray;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.generated.Int128;
import org.web3j.abi.datatypes.generated.Int24;
import org.web3j.abi.datatypes.generated.Int256;
import org.web3j.abi.datatypes.generated.Int56;
import org.web3j.abi.datatypes.generated.Uint128;
import org.web3j.abi.datatypes.generated.Uint16;
import org.web3j.abi.datatypes.generated.Uint160;
import org.web3j.abi.datatypes.generated.Uint24;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint32;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteFunctionCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.BaseEventResponse;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.tuples.generated.Tuple3;
import org.web3j.tuples.generated.Tuple4;
import org.web3j.tuples.generated.Tuple5;
import org.web3j.tuples.generated.Tuple7;
import org.web3j.tuples.generated.Tuple8;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/LFDT-web3j/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 1.7.0.
*/
@SuppressWarnings("rawtypes")
public class UniswapV3Pool extends Contract {
public static final String BINARY = "Bin file was not provided";
public static final String FUNC_BURN = "burn";
public static final String FUNC_COLLECT = "collect";
public static final String FUNC_COLLECTPROTOCOL = "collectProtocol";
public static final String FUNC_FACTORY = "factory";
public static final String FUNC_FEE = "fee";
public static final String FUNC_FEEGROWTHGLOBAL0X128 = "feeGrowthGlobal0X128";
public static final String FUNC_FEEGROWTHGLOBAL1X128 = "feeGrowthGlobal1X128";
public static final String FUNC_FLASH = "flash";
public static final String FUNC_INCREASEOBSERVATIONCARDINALITYNEXT = "increaseObservationCardinalityNext";
public static final String FUNC_INITIALIZE = "initialize";
public static final String FUNC_LIQUIDITY = "liquidity";
public static final String FUNC_MAXLIQUIDITYPERTICK = "maxLiquidityPerTick";
public static final String FUNC_MINT = "mint";
public static final String FUNC_OBSERVATIONS = "observations";
public static final String FUNC_OBSERVE = "observe";
public static final String FUNC_POSITIONS = "positions";
public static final String FUNC_PROTOCOLFEES = "protocolFees";
public static final String FUNC_SETFEEPROTOCOL = "setFeeProtocol";
public static final String FUNC_SLOT0 = "slot0";
public static final String FUNC_SNAPSHOTCUMULATIVESINSIDE = "snapshotCumulativesInside";
public static final String FUNC_SWAP = "swap";
public static final String FUNC_TICKBITMAP = "tickBitmap";
public static final String FUNC_TICKSPACING = "tickSpacing";
public static final String FUNC_TICKS = "ticks";
public static final String FUNC_TOKEN0 = "token0";
public static final String FUNC_TOKEN1 = "token1";
public static final Event BURN_EVENT = new Event("Burn",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event COLLECT_EVENT = new Event("Collect",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>() {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event COLLECTPROTOCOL_EVENT = new Event("CollectProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event FLASH_EVENT = new Event("Flash",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event INCREASEOBSERVATIONCARDINALITYNEXT_EVENT = new Event("IncreaseObservationCardinalityNext",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}));
;
public static final Event INITIALIZE_EVENT = new Event("Initialize",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}));
;
public static final Event MINT_EVENT = new Event("Mint",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}, new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event SETFEEPROTOCOL_EVENT = new Event("SetFeeProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}));
;
public static final Event SWAP_EVENT = new Event("Swap",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Int256>() {}, new TypeReference<Int256>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint128>() {}, new TypeReference<Int24>() {}));
;
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice,
BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public static List<BurnEventResponse> getBurnEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(BURN_EVENT, transactionReceipt);
ArrayList<BurnEventResponse> responses = new ArrayList<BurnEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static BurnEventResponse getBurnEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(BURN_EVENT, log);
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<BurnEventResponse> burnEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getBurnEventFromLog(log));
}
public Flowable<BurnEventResponse> burnEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(BURN_EVENT));
return burnEventFlowable(filter);
}
public static List<CollectEventResponse> getCollectEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECT_EVENT, transactionReceipt);
ArrayList<CollectEventResponse> responses = new ArrayList<CollectEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectEventResponse getCollectEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECT_EVENT, log);
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<CollectEventResponse> collectEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectEventFromLog(log));
}
public Flowable<CollectEventResponse> collectEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECT_EVENT));
return collectEventFlowable(filter);
}
public static List<CollectProtocolEventResponse> getCollectProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, transactionReceipt);
ArrayList<CollectProtocolEventResponse> responses = new ArrayList<CollectProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectProtocolEventResponse getCollectProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, log);
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectProtocolEventFromLog(log));
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECTPROTOCOL_EVENT));
return collectProtocolEventFlowable(filter);
}
public static List<FlashEventResponse> getFlashEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(FLASH_EVENT, transactionReceipt);
ArrayList<FlashEventResponse> responses = new ArrayList<FlashEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static FlashEventResponse getFlashEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(FLASH_EVENT, log);
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<FlashEventResponse> flashEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getFlashEventFromLog(log));
}
public Flowable<FlashEventResponse> flashEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(FLASH_EVENT));
return flashEventFlowable(filter);
}
public static List<IncreaseObservationCardinalityNextEventResponse> getIncreaseObservationCardinalityNextEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, transactionReceipt);
ArrayList<IncreaseObservationCardinalityNextEventResponse> responses = new ArrayList<IncreaseObservationCardinalityNextEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static IncreaseObservationCardinalityNextEventResponse getIncreaseObservationCardinalityNextEventFromLog(
Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, log);
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = log;
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getIncreaseObservationCardinalityNextEventFromLog(log));
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT));
return increaseObservationCardinalityNextEventFlowable(filter);
}
public static List<InitializeEventResponse> getInitializeEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INITIALIZE_EVENT, transactionReceipt);
ArrayList<InitializeEventResponse> responses = new ArrayList<InitializeEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static InitializeEventResponse getInitializeEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INITIALIZE_EVENT, log);
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = log;
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<InitializeEventResponse> initializeEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getInitializeEventFromLog(log));
}
public Flowable<InitializeEventResponse> initializeEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INITIALIZE_EVENT));
return initializeEventFlowable(filter);
}
public static List<MintEventResponse> getMintEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(MINT_EVENT, transactionReceipt);
ArrayList<MintEventResponse> responses = new ArrayList<MintEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static MintEventResponse getMintEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(MINT_EVENT, log);
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<MintEventResponse> mintEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getMintEventFromLog(log));
}
public Flowable<MintEventResponse> mintEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(MINT_EVENT));
return mintEventFlowable(filter);
}
public static List<SetFeeProtocolEventResponse> getSetFeeProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, transactionReceipt);
ArrayList<SetFeeProtocolEventResponse> responses = new ArrayList<SetFeeProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SetFeeProtocolEventResponse getSetFeeProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, log);
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = log;
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSetFeeProtocolEventFromLog(log));
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SETFEEPROTOCOL_EVENT));
return setFeeProtocolEventFlowable(filter);
}
public static List<SwapEventResponse> getSwapEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SWAP_EVENT, transactionReceipt);
ArrayList<SwapEventResponse> responses = new ArrayList<SwapEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SwapEventResponse getSwapEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SWAP_EVENT, log);
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
return typedResponse;
}
public Flowable<SwapEventResponse> swapEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSwapEventFromLog(log));
}
public Flowable<SwapEventResponse> swapEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SWAP_EVENT));
return swapEventFlowable(filter);
}
public RemoteFunctionCall<TransactionReceipt> burn(BigInteger tickLower, BigInteger tickUpper,
BigInteger amount) {
final Function function = new Function(
FUNC_BURN,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collect(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collectProtocol(String recipient,
BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECTPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<String> factory() {
final Function function = new Function(FUNC_FACTORY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<BigInteger> fee() {
final Function function = new Function(FUNC_FEE,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal0X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL0X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal1X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL1X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> flash(String recipient, BigInteger amount0,
BigInteger amount1, byte[] data) {
final Function function = new Function(
FUNC_FLASH,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint256(amount0),
new org.web3j.abi.datatypes.generated.Uint256(amount1),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> increaseObservationCardinalityNext(
BigInteger observationCardinalityNext) {
final Function function = new Function(
FUNC_INCREASEOBSERVATIONCARDINALITYNEXT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint16(observationCardinalityNext)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> initialize(BigInteger sqrtPriceX96) {
final Function function = new Function(
FUNC_INITIALIZE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceX96)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> liquidity() {
final Function function = new Function(FUNC_LIQUIDITY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> maxLiquidityPerTick() {
final Function function = new Function(FUNC_MAXLIQUIDITYPERTICK,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> mint(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount, byte[] data) {
final Function function = new Function(
FUNC_MINT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>> observations(
BigInteger param0) {
final Function function = new Function(FUNC_OBSERVATIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint256(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint32>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple4<BigInteger, BigInteger, BigInteger, Boolean> call() throws
Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple4<BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(Boolean) results.get(3).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>> observe(
List<BigInteger> secondsAgos) {
final Function function = new Function(FUNC_OBSERVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.DynamicArray<org.web3j.abi.datatypes.generated.Uint32>(
org.web3j.abi.datatypes.generated.Uint32.class,
org.web3j.abi.Utils.typeMap(secondsAgos, org.web3j.abi.datatypes.generated.Uint32.class))),
Arrays.<TypeReference<?>>asList(new TypeReference<DynamicArray<Int56>>() {}, new TypeReference<DynamicArray<Uint160>>() {}));
return new RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>>(function,
new Callable<Tuple2<List<BigInteger>, List<BigInteger>>>() {
@Override
public Tuple2<List<BigInteger>, List<BigInteger>> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<List<BigInteger>, List<BigInteger>>(
convertToNative((List<Int56>) results.get(0).getValue()),
convertToNative((List<Uint160>) results.get(1).getValue()));
}
});
}
public RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>> positions(
byte[] param0) {
final Function function = new Function(FUNC_POSITIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Bytes32(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger> call()
throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<BigInteger, BigInteger>> protocolFees() {
final Function function = new Function(FUNC_PROTOCOLFEES,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple2<BigInteger, BigInteger>>(function,
new Callable<Tuple2<BigInteger, BigInteger>>() {
@Override
public Tuple2<BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> setFeeProtocol(BigInteger feeProtocol0,
BigInteger feeProtocol1) {
final Function function = new Function(
FUNC_SETFEEPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint8(feeProtocol0),
new org.web3j.abi.datatypes.generated.Uint8(feeProtocol1)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> slot0(
) {
final Function function = new Function(FUNC_SLOT0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint8>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(Boolean) results.get(6).getValue());
}
});
}
public RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>> snapshotCumulativesInside(
BigInteger tickLower, BigInteger tickUpper) {
final Function function = new Function(FUNC_SNAPSHOTCUMULATIVESINSIDE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper)),
Arrays.<TypeReference<?>>asList(new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}));
return new RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple3<BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple3<BigInteger, BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple3<BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> swap(String recipient, Boolean zeroForOne,
BigInteger amountSpecified, BigInteger sqrtPriceLimitX96, byte[] data) {
final Function function = new Function(
FUNC_SWAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.Bool(zeroForOne),
new org.web3j.abi.datatypes.generated.Int256(amountSpecified),
new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceLimitX96),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> tickBitmap(BigInteger param0) {
final Function function = new Function(FUNC_TICKBITMAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int16(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> tickSpacing() {
final Function function = new Function(FUNC_TICKSPACING,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Int24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> ticks(
BigInteger param0) {
final Function function = new Function(FUNC_TICKS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Int128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(BigInteger) results.get(6).getValue(),
(Boolean) results.get(7).getValue());
}
});
}
public RemoteFunctionCall<String> token0() {
final Function function = new Function(FUNC_TOKEN0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<String> token1() {
final Function function = new Function(FUNC_TOKEN1,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, credentials, contractGasProvider);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class BurnEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectProtocolEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class FlashEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger paid0;
public BigInteger paid1;
}
public static class IncreaseObservationCardinalityNextEventResponse extends BaseEventResponse {
public BigInteger observationCardinalityNextOld;
public BigInteger observationCardinalityNextNew;
}
public static class InitializeEventResponse extends BaseEventResponse {
public BigInteger sqrtPriceX96;
public BigInteger tick;
}
public static class MintEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String sender;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class SetFeeProtocolEventResponse extends BaseEventResponse {
public BigInteger feeProtocol0Old;
public BigInteger feeProtocol1Old;
public BigInteger feeProtocol0New;
public BigInteger feeProtocol1New;
}
public static class SwapEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger sqrtPriceX96;
public BigInteger liquidity;
public BigInteger tick;
}
}

View File

@@ -2,11 +2,4 @@ package stream.dto;
public class AddressId extends ChainId {
public String address;
public AddressId() {}
public AddressId(int chainId, String address) {
super(chainId);
this.address = address;
}
}

View File

@@ -12,9 +12,4 @@ public class BlockHash extends BlockId {
public Object getId() {
return this.hash;
}
@Override
public String toString() {
return this.hash;
}
}

View File

@@ -12,9 +12,4 @@ public class BlockNumber extends BlockId {
public Object getId() {
return this.number;
}
@Override
public String toString() {
return String.valueOf(this.number);
}
}

View File

@@ -4,10 +4,4 @@ import java.io.Serializable;
public class ChainId implements Serializable {
public int chainId;
public ChainId() {}
public ChainId(int chainId) {
this.chainId = chainId;
}
}

View File

@@ -7,8 +7,6 @@ import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serial;
import java.io.Serializable;
@@ -22,8 +20,6 @@ import static stream.io.EthUtils.keccak;
@SuppressWarnings("rawtypes")
public class ElaboratedEventType<T extends EventLog> extends EventType {
private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class);
public transient String name;
public transient List<String> paramNames;
@@ -67,9 +63,7 @@ public class ElaboratedEventType<T extends EventLog> extends EventType {
int dataValuesIndex = 0;
int topicsIndex = 1; // the first topic is the event signature
List<Object> args = new ArrayList<>();
for (int i = 0; i < this.paramTypes.size(); i++) {
TypeReference<Type> paramType = this.paramTypes.get(i);
String paramName = this.paramNames.get(i);
for (TypeReference<Type> paramType : this.paramTypes) {
Object value;
if (paramType.isIndexed()) {
String encoded = topics.get(topicsIndex++);

View File

@@ -1,42 +0,0 @@
package stream.dto;
import java.io.Serializable;
public class ElaboratedSwapEvent implements Serializable {
private SwapEventLog swapEvent;
private String token0Address;
private String token1Address;
public ElaboratedSwapEvent() {
}
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventLog swapEvent) {
this.swapEvent = swapEvent;
}
public String getToken0Address() {
return token0Address;
}
public void setToken0Address(String token0Address) {
this.token0Address = token0Address;
}
public String getToken1Address() {
return token1Address;
}
public void setToken1Address(String token1Address) {
this.token1Address = token1Address;
}
}

View File

@@ -0,0 +1,38 @@
package stream.dto;
import java.io.Serializable;
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
// Token metadata for token0
public Token token0Metadata;
// Token metadata for token1
public Token token1Metadata;
public EnrichedSwapEventLog() {
super();
}
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
// Copy all base swap event fields
enriched.address = swapEvent.address;
enriched.sender = swapEvent.sender;
enriched.recipient = swapEvent.recipient;
enriched.amount0 = swapEvent.amount0;
enriched.amount1 = swapEvent.amount1;
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
enriched.liquidity = swapEvent.liquidity;
enriched.tick = swapEvent.tick;
enriched.token0 = swapEvent.token0;
enriched.token1 = swapEvent.token1;
// Copy EventLog fields
enriched.blockNumber = swapEvent.blockNumber;
enriched.transactionHash = swapEvent.transactionHash;
return enriched;
}
}

View File

@@ -1,6 +1,6 @@
package stream.dto;
public enum Exchange {
enum Exchange {
UNISWAP_V2,
UNISWAP_V3,
UNISWAP_V4,

View File

@@ -1,42 +0,0 @@
package stream.dto;
import java.io.Serializable;
public class FullyElaboratedSwap implements Serializable {
private ElaboratedSwapEvent swapEvent;
private Token token0;
private Token token1;
public FullyElaboratedSwap() {
}
public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) {
this.swapEvent = swapEvent;
this.token0 = token0;
this.token1 = token1;
}
public ElaboratedSwapEvent getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(ElaboratedSwapEvent swapEvent) {
this.swapEvent = swapEvent;
}
public Token getToken0() {
return token0;
}
public void setToken0(Token token0) {
this.token0 = token0;
}
public Token getToken1() {
return token1;
}
public void setToken1(Token token1) {
this.token1 = token1;
}
}

View File

@@ -8,35 +8,11 @@ public class Swap extends ChainId {
@Serial
private static final long serialVersionUID = 1L;
public Long time; // milliseconds
public Exchange exchange;
public String pool; // address
public String takerAsset; // token address
public String makerAsset; // token address
public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold.
public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold.
public BigDecimal price;
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) {
super(chainId);
this.time = time;
this.exchange = exchange;
this.pool = pool;
this.takerAsset = takerAsset;
this.makerAsset = makerAsset;
this.amountIn = amountIn;
this.amountOut = amountOut;
this.price = price; // Total trade value
}
// Getter methods
public Long getTime() { return time; }
public Exchange getExchange() { return exchange; }
public String getPool() { return pool; }
public String getTakerAsset() { return takerAsset; }
public String getMakerAsset() { return makerAsset; }
public BigDecimal getAmountIn() { return amountIn; }
public BigDecimal getAmountOut() { return amountOut; }
public BigDecimal getPrice() { return price; }
Long time; // milliseconds
Exchange exchange;
String pool; // address
String takerAsset; // token address
String makerAsset; // token address
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
BigDecimal price;
}

View File

@@ -29,13 +29,13 @@ public class SwapEventLog extends EventLog implements Serializable {
public String sender;
public String recipient;
@BigInt
public BigInteger amount0;
@BigInt
public BigInteger amount1;
@BigInt
public BigInteger sqrtPriceX96;
@BigInt
public BigInteger liquidity;
public int tick;
public String getAddress() {
return this.address;
}
}

View File

@@ -6,22 +6,31 @@ public class Token extends AddressId {
@Serial
private static final long serialVersionUID = 1L;
public String name;
public String symbol;
public int decimals;
@SuppressWarnings("unused")
public Token() {}
public Token(int chainId, String address, String name, String symbol, int decimals) {
super(chainId, address);
private String name;
private String symbol;
private int decimals;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public int getDecimals() {
return decimals;
}
public void setDecimals(int decimals) {
this.decimals = decimals;
}
@Override
public String toString() {
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
}
}

View File

@@ -1,52 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.BlockId;
import stream.dto.BlockNumber;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
private transient Web3j w3;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
}
@Override
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
CompletableFuture.supplyAsync(() -> {
try {
return getBlock(blockId);
} catch (Exception e) {
log.error("Failed to get block {} on chain {}", blockId, e);
throw new RuntimeException("Error processing block " + blockId, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private EthBlock getBlock(BlockId id) {
try {
return (id instanceof BlockNumber) ?
w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() :
w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,9 +1,7 @@
package stream.io;
import java.util.HexFormat;
import org.bouncycastle.jcajce.provider.digest.Keccak;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class EthUtils {
public static boolean isValidAddress(String address) {
@@ -37,6 +35,6 @@ public class EthUtils {
public static String keccak(String message) {
byte[] hash = new Keccak.Digest256().digest(message.getBytes());
return "0x" + HexFormat.of().formatHex(hash);
return "0x"+ByteUtils.toHexString(hash);
}
}

View File

@@ -1,34 +0,0 @@
package stream.io;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.JedisPooled;
import java.time.Duration;
import java.util.Map;
public class JedisClient {
protected static volatile JedisPooled jedis = null;
private static final Object jedisLock = new Object();
static public JedisPooled get(Map<String, String> params) {
if (jedis == null) {
synchronized (jedisLock) {
String url = params.getOrDefault("redis_url", "http://localhost:6379");
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(connections);
poolConfig.setMaxIdle(connections);
poolConfig.setMinIdle(0);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWait(Duration.ofSeconds(5));
// Enables sending a PING command periodically while the connection is idle.
poolConfig.setTestWhileIdle(true);
// controls the period between checks for idle connections in the pool
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
jedis = new JedisPooled(poolConfig, url);
}
}
return jedis;
}
}

View File

@@ -1,80 +0,0 @@
package stream.io;
import stream.dto.SwapEventLog;
import stream.dto.ElaboratedSwapEvent;
import stream.contract.UniswapV3Pool;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.web3j.crypto.Credentials;
import org.web3j.tx.gas.DefaultGasProvider;
import java.util.concurrent.CompletableFuture;
import java.util.Collections;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> {
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class);
private transient Web3j web3j;
private transient Credentials credentials;
private transient DefaultGasProvider gasProvider;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
// Get RPC URL from job parameters
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
// TODO: Get from configuration if needed
// Initialize Web3j
this.web3j = Web3j.build(new HttpService(rpcUrl));
// Dummy credentials for read-only operations
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
// Default gas provider
this.gasProvider = new DefaultGasProvider();
}
@Override
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
// Load the pool contract
UniswapV3Pool pool = UniswapV3Pool.load(
swap.getAddress(), // Pool address from the event
web3j,
credentials,
gasProvider
);
// Get token addresses
String token0 = pool.token0().send();
String token1 = pool.token1().send();
// Create enriched event
return new ElaboratedSwapEvent(swap, token0, token1);
} catch (Exception e) {
log.error("Error fetching pool tokens", e);
// Return original without enrichment
return new ElaboratedSwapEvent(swap, null, null);
}
}).thenAccept(enriched -> {
resultFuture.complete(Collections.singletonList(enriched));
});
}
@Override
public void close() throws Exception {
if (web3j != null) {
web3j.shutdown();
}
}
}

View File

@@ -0,0 +1,52 @@
package stream.io;
public class PoolTokenFetcher {
// Uniswap V3 pool method signatures
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
private final JsonRpcClient jsonRpcClient;
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
this.jsonRpcClient = jsonRpcClient;
}
public String fetchToken0(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
public String fetchToken1(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
private String decodeAddress(String hexResult) {
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
return null;
}
// Address is the last 20 bytes (40 hex chars) of the result
// Remove 0x prefix and get last 40 characters, then add 0x back
String hex = hexResult.substring(2);
if (hex.length() >= 40) {
return "0x" + hex.substring(hex.length() - 40);
}
return hexResult; // Return as-is if unexpected format
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -1,94 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.math.RoundingMode;
public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class);
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
@Override
public Swap map(FullyElaboratedSwap fullSwap) throws Exception {
ElaboratedSwapEvent event = fullSwap.getSwapEvent();
SwapEventLog swapLog = event.getSwapEvent();
Token token0 = fullSwap.getToken0();
Token token1 = fullSwap.getToken1();
// For now, hardcode exchange as UNISWAP_V3 and time as current time
Long time = System.currentTimeMillis();
Exchange exchange = Exchange.UNISWAP_V3;
// Pool address
String pool = swapLog.address;
// Get sqrtPriceX96 and calculate actual price
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
// Adjust price for decimals (price is token1/token0)
int decimalDiff = token0.decimals - token1.decimals;
BigDecimal adjustedPrice;
if (decimalDiff >= 0) {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
} else {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
}
// Determine which token is in and which is out based on amount signs
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
String takerAsset;
String makerAsset;
BigDecimal amountIn;
BigDecimal amountOut;
BigDecimal finalPrice;
if (isToken0In) {
// User is sending token0, receiving token1
takerAsset = event.getToken0Address();
makerAsset = event.getToken1Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount0.abs())
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount1)
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
// Price is how much token1 you get per token0
finalPrice = adjustedPrice;
} else {
// User is sending token1, receiving token0
takerAsset = event.getToken1Address();
makerAsset = event.getToken0Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount1.abs())
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount0)
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
}
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
pool, token0.symbol + "/" + token1.symbol,
isToken0In ? token1.symbol : token0.symbol,
amountIn, amountOut, finalPrice);
// Pass both amountIn and amountOut to constructor with unit price
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
}
}

View File

@@ -0,0 +1,161 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.EnrichedSwapEventLog;
import stream.dto.SwapEventLog;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
private transient JsonRpcClient jsonRpcClient;
private transient PoolTokenFetcher poolTokenFetcher;
private transient TokenMetadataFetcher tokenMetadataFetcher;
private transient TokenCache tokenCache;
private final String rpcUrl;
public SwapEventEnricher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
this.tokenMetadataFetcher.open(openContext);
this.tokenCache = new TokenCache();
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
try {
// First, get token addresses if not already populated
CompletableFuture<Void> tokenAddressFuture;
if (swapEvent.token0 == null || swapEvent.token1 == null) {
tokenAddressFuture = CompletableFuture.runAsync(() -> {
try {
if (swapEvent.token0 == null) {
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
}
if (swapEvent.token1 == null) {
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
}
} catch (Throwable e) {
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
}
});
} else {
tokenAddressFuture = CompletableFuture.completedFuture(null);
}
// Then fetch metadata for both tokens
tokenAddressFuture.thenCompose(v -> {
if (swapEvent.token0 == null || swapEvent.token1 == null) {
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
}
// Fetch metadata for both tokens concurrently (with caching)
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
.thenApply(ignored -> {
try {
Token token0Meta = token0MetaFuture.get();
Token token1Meta = token1MetaFuture.get();
// Create enriched event with token metadata
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
// Set token metadata objects
if (token0Meta != null) {
enriched.token0Metadata = token0Meta;
// Also set the address in the Token object
token0Meta.address = swapEvent.token0;
}
if (token1Meta != null) {
enriched.token1Metadata = token1Meta;
// Also set the address in the Token object
token1Meta.address = swapEvent.token1;
}
return enriched;
} catch (Exception e) {
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
}
});
})
.thenAccept(enrichedEvent -> {
resultFuture.complete(Collections.singleton(enrichedEvent));
})
.exceptionally(throwable -> {
System.err.println("Error enriching swap event: " + throwable.getMessage());
throwable.printStackTrace();
resultFuture.completeExceptionally(throwable);
return null;
});
// Log cache stats periodically (every 100 events)
if (System.currentTimeMillis() % 10000 < 100) {
System.out.println("Token cache stats: " + tokenCache.getStats());
}
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private static class MockResultFuture implements ResultFuture<Token> {
private Token result;
private Throwable error;
private boolean complete = false;
@Override
public void complete(java.util.Collection<Token> results) {
if (!results.isEmpty()) {
this.result = results.iterator().next();
}
this.complete = true;
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
this.error = error;
this.complete = true;
}
public boolean isComplete() {
return complete;
}
public boolean hasError() {
return error != null;
}
public Token getResult() {
return result;
}
public Throwable getError() {
return error;
}
}
}

View File

@@ -0,0 +1,142 @@
package stream.io;
import stream.dto.Token;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
/**
* Thread-safe cache for token metadata to avoid repeated RPC calls
* for the same token addresses.
*/
public class TokenCache {
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
/**
* Get token from cache if present, otherwise return null
*/
public Token get(String tokenAddress) {
return cache.get(tokenAddress.toLowerCase());
}
/**
* Put token in cache
*/
public void put(String tokenAddress, Token token) {
if (token != null && tokenAddress != null) {
cache.put(tokenAddress.toLowerCase(), token);
}
}
/**
* Check if token is already in cache
*/
public boolean contains(String tokenAddress) {
return cache.containsKey(tokenAddress.toLowerCase());
}
/**
* Get or fetch token metadata with deduplication.
* If multiple requests come for the same token simultaneously,
* only one will fetch and others will wait for the result.
*/
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
String normalizedAddress = tokenAddress.toLowerCase();
// Check cache first
Token cachedToken = cache.get(normalizedAddress);
if (cachedToken != null) {
return CompletableFuture.completedFuture(cachedToken);
}
// Check if there's already a pending request for this token
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
if (pendingFuture != null) {
return pendingFuture;
}
// Create new request
CompletableFuture<Token> future = new CompletableFuture<>();
// Register the pending request
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
if (existingFuture != null) {
// Another thread beat us to it, return their future
return existingFuture;
}
// We're the first, so fetch the token metadata
try {
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
@Override
public void complete(java.util.Collection<Token> results) {
Token token = results.isEmpty() ? null : results.iterator().next();
// Cache the result (even if null to avoid repeated failures)
if (token != null) {
cache.put(normalizedAddress, token);
}
// Complete the future
future.complete(token);
// Remove from pending requests
pendingRequests.remove(normalizedAddress);
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
future.completeExceptionally(error);
pendingRequests.remove(normalizedAddress);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
pendingRequests.remove(normalizedAddress);
}
return future;
}
/**
* Get cache statistics for monitoring
*/
public CacheStats getStats() {
return new CacheStats(cache.size(), pendingRequests.size());
}
/**
* Clear all cached entries (useful for testing or memory management)
*/
public void clear() {
cache.clear();
// Note: We don't clear pending requests as they're in flight
}
public static class CacheStats {
public final int cachedTokens;
public final int pendingRequests;
public CacheStats(int cachedTokens, int pendingRequests) {
this.cachedTokens = cachedTokens;
this.pendingRequests = pendingRequests;
}
@Override
public String toString() {
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
}
}
}

View File

@@ -1,66 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.tx.ReadonlyTransactionManager;
import org.web3j.tx.gas.DefaultGasProvider;
import stream.contract.ERC20;
import stream.dto.AddressId;
import stream.dto.Token;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
private transient Web3j w3;
private DefaultGasProvider gasProvider;
private ReadonlyTransactionManager transactionManager;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
gasProvider = new DefaultGasProvider();
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
}
@Override
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
CompletableFuture.supplyAsync(() -> {
try {
return processToken(address);
} catch (Exception e) {
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
throw new RuntimeException("Error processing token: " + address, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private Token processToken(AddressId address) {
String name;
String symbol;
int decimals;
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
try {
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
CompletableFuture<String> nameAsync = contract.name().sendAsync();
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
decimals = decimalsAsync.get().intValue();
name = nameAsync.get();
symbol = symbolAsync.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return new Token(address.chainId, address.address, name, symbol, decimals);
}
}

View File

@@ -0,0 +1,166 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
// ERC-20 method signatures
private static final String NAME_SIGNATURE = "0x06fdde03";
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
private static final String DECIMALS_SIGNATURE = "0x313ce567";
private static final String LATEST_BLOCK = "latest";
private transient JsonRpcClient jsonRpcClient;
private final String rpcUrl;
public TokenMetadataFetcher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
}
@Override
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
try {
// Fetch all metadata concurrently
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchName(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchSymbol(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchDecimals(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
// Combine all results
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
.thenAccept(v -> {
try {
Token token = new Token();
token.setName(decodeString(nameFuture.get()));
token.setSymbol(decodeString(symbolFuture.get()));
token.setDecimals(decodeUint8(decimalsFuture.get()));
resultFuture.complete(Collections.singleton(token));
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
})
.exceptionally(throwable -> {
resultFuture.completeExceptionally(throwable);
return null;
});
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private String fetchName(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, NAME_SIGNATURE);
}
private String fetchSymbol(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
}
private String fetchDecimals(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
}
private String makeEthCall(String toAddress, String data) throws Throwable {
EthCallRequest request = new EthCallRequest(toAddress, data);
Object[] params = new Object[]{request, LATEST_BLOCK};
return jsonRpcClient.invoke("eth_call", params, String.class);
}
private String decodeString(String hexData) {
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
return "Unknown";
}
try {
// Remove 0x prefix
String hex = hexData.substring(2);
// Skip the first 32 bytes (offset pointer)
// The actual string length is at bytes 32-64
String lengthHex = hex.substring(64, 128);
int length = Integer.parseInt(lengthHex, 16);
if (length <= 0 || length > 100) { // Sanity check
return "Invalid";
}
// Extract the actual string data starting at byte 64
String dataHex = hex.substring(128, 128 + (length * 2));
// Convert hex to string
StringBuilder result = new StringBuilder();
for (int i = 0; i < dataHex.length(); i += 2) {
String hexChar = dataHex.substring(i, i + 2);
int charCode = Integer.parseInt(hexChar, 16);
if (charCode > 0) { // Skip null bytes
result.append((char) charCode);
}
}
return result.toString();
} catch (Exception e) {
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
return "DecodeError";
}
}
private int decodeUint8(String hexData) {
if (hexData == null || hexData.equals("0x")) {
return 0;
}
try {
// Remove 0x prefix and get last byte (uint8)
String hex = hexData.substring(2);
// uint8 is the last 2 hex characters (1 byte)
String uint8Hex = hex.substring(hex.length() - 2);
return Integer.parseInt(uint8Hex, 16);
} catch (Exception e) {
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
return 0;
}
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -1,45 +0,0 @@
package stream.io;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.MODERN_TLS;
public class Web3Client {
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
private static volatile Web3j w3 = null;
private static final Object w3Lock = new Object();
static public Web3j get(Map<String, String> params) {
if (w3==null) {
synchronized (w3Lock) {
log.info("Initializing Web3 client");
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
Duration timeout = Duration.ofSeconds(30);
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
url, maxIdleConnections, keepAlive, timeout.getSeconds());
var httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
.build();
w3 = Web3j.build(new HttpService(url, httpClient));
log.info("Web3 client initialized successfully");
}
}
return w3;
}
}