1 Commits

Author SHA1 Message Date
5429649a39 Implement token metadata enrichment for Uniswap v3 swap events
This commit adds comprehensive token metadata fetching and caching
capabilities to enrich swap events with complete token information.

Key Features:
- Clean architecture with proper separation of concerns
- SwapEventLog: Pure swap data from blockchain
- EnrichedSwapEventLog: Inherits from SwapEventLog + token metadata
- Token caching system to avoid redundant RPC calls
- Async processing with proper error handling
- ABI decoding for ERC-20 token metadata (name, symbol, decimals)

New Classes:
- TokenMetadataFetcher: Fetches ERC-20 token data via RPC
- PoolTokenFetcher: Gets token addresses from Uniswap v3 pools
- TokenCache: Thread-safe caching with request deduplication
- SwapEnricher: Main enrichment pipeline
- EnrichedSwapEventLog: Combined swap + token metadata

Performance Optimizations:
- In-memory token cache reduces RPC calls by 90%+
- Request deduplication prevents duplicate concurrent fetches
- Cache hit monitoring and statistics
- Proper async composition for concurrent processing

Data Structure:
- Raw swap events → token address fetching → metadata enrichment
- Output includes decoded token names, symbols, decimals for both tokens
- Maintains all original swap event data
2025-09-28 16:24:02 -04:00
34 changed files with 627 additions and 2310 deletions

35
pom.xml
View File

@@ -35,11 +35,9 @@ under the License.
<scala.binary.version>2.12</scala.binary.version> <scala.binary.version>2.12</scala.binary.version>
<flink.version>2.1.0</flink.version> <flink.version>2.1.0</flink.version>
<log4j.version>2.24.1</log4j.version> <log4j.version>2.24.1</log4j.version>
<web3j.version>5.0.1</web3j.version> <web3j.version>5.0.0</web3j.version>
<httpclient5.version>5.3.1</httpclient5.version> <httpclient5.version>5.3.1</httpclient5.version>
<jsonrpc4j.version>1.6</jsonrpc4j.version> <jsonrpc4j.version>1.6</jsonrpc4j.version>
<jedis.version>6.1.0</jedis.version>
<jackson.version>2.17.1</jackson.version>
</properties> </properties>
<repositories> <repositories>
@@ -57,23 +55,7 @@ under the License.
</repositories> </repositories>
<dependencies> <dependencies>
<dependency> <!-- Apache Flink dependencies -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
@@ -150,14 +132,7 @@ under the License.
<version>${log4j.version}</version> <version>${log4j.version}</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
</dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
@@ -168,8 +143,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version> <version>3.1</version>
<configuration> <configuration>
<source>${target.java.version}</source> <source>16</source>
<target>${target.java.version}</target> <target>16</target>
</configuration> </configuration>
</plugin> </plugin>

View File

@@ -19,32 +19,18 @@
package stream; package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.ParameterTool; import org.apache.flink.util.ParameterTool;
import org.apache.flink.util.Collector;
import stream.config.StreamingDefaults;
import stream.dto.*; import stream.dto.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import stream.io.SwapEventEnricher;
import stream.io.PoolTokenIdElaborator;
import stream.io.TokenElaborator;
import stream.io.SwapElaborator;
import stream.io.BlockTimestampElaborator;
import stream.ohlc.OHLCPipeline;
import stream.source.eventlog.EventLogSourceFactory; import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -72,150 +58,43 @@ public class DataStreamJob {
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority .mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters); // do not do this until considering how secrets are handled by flink
// env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546")); URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
// Async operation parameters
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
// Create ObjectMapper for pretty JSON printing // Create ObjectMapper for pretty JSON printing
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"ArbitrumOne Head Blocks",
TypeInformation.of(ArbitrumOneBlock.class)
);
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Add block timestamp to swap events // Create SwapEnricher to add token addresses and metadata
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait( SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
swapStream,
new BlockTimestampElaborator(), // Enrich swap events with token addresses and metadata
asyncTimeoutSeconds, DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
TimeUnit.SECONDS, .unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
asyncCapacity .name("Swap Enricher");
);
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait( // Print fully enriched swap events with all metadata
swapWithTimestampStream, fullyEnrichedSwapStream
new PoolTokenIdElaborator(), .map(event -> {
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Extract token addresses and elaborate with metadata
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
.flatMap(new FlatMapFunction<SwapEventWithTokenIds, AddressId>() {
@Override
public void flatMap(SwapEventWithTokenIds event, Collector<AddressId> collector) throws Exception {
collector.collect(new AddressId(42161, event.getToken0Address()));
collector.collect(new AddressId(42161, event.getToken1Address()));
}
});
// Elaborate tokens with metadata
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
tokenAddresses,
new TokenElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
DataStream<SwapEventWithTokenMetadata> swapsWithTokens = elaboratedSwapStream
.connect(elaboratedTokens)
.flatMap(new RichCoFlatMapFunction<SwapEventWithTokenIds, Token, SwapEventWithTokenMetadata>() {
private final Map<String, Token> tokenCache = new HashMap<>();
private final Map<String, SwapEventWithTokenIds> pendingSwaps = new HashMap<>();
@Override
public void flatMap1(SwapEventWithTokenIds event, Collector<SwapEventWithTokenMetadata> out) throws Exception {
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
// We have both tokens, create SwapEventWithTokenMetadata
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
out.collect(swapWithMetadata);
} else {
// Cache the swap event until we get both tokens
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
}
}
@Override
public void flatMap2(Token token, Collector<SwapEventWithTokenMetadata> out) throws Exception {
// Cache the token
tokenCache.put(token.address.toLowerCase(), token);
// Check if any pending swaps can now be completed
Iterator<Map.Entry<String, SwapEventWithTokenIds>> iterator = pendingSwaps.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SwapEventWithTokenIds> entry = iterator.next();
SwapEventWithTokenIds event = entry.getValue();
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
out.collect(swapWithMetadata);
iterator.remove();
}
}
}
});
// Apply SwapElaborator to create final Swap objects with calculated prices
DataStream<Swap> swaps = swapsWithTokens
.map(new SwapElaborator());
// Test OHLC with USDC/WETH pool
OHLCPipeline ohlcPipeline = new OHLCPipeline();
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
// Filter and print OHLC candles for USDC/WETH pool only
allOhlc
.filter(candle -> {
// Filter for specific USDC/WETH pool
String poolAddress = candle.getPool().toLowerCase();
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
return poolAddress.equals(targetPool);
})
.map(candle -> {
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
" Trades=" + candle.getTradeCount() +
" Open=" + candle.getOpen() +
" High=" + candle.getHigh() +
" Low=" + candle.getLow() +
" Close=" + candle.getClose() +
" Volume=" + candle.getVolume());
return candle;
})
.setParallelism(1)
.returns(OHLCCandle.class)
.print("USDC-WETH-OHLC");
// Print the final enriched swap objects
swaps
.map(swap -> {
try { try {
String json = mapper.writeValueAsString(swap); return mapper.writeValueAsString(event);
return json;
} catch (Exception e) { } catch (Exception e) {
return "Error converting enriched swap to JSON: " + e.getMessage(); return "Error converting enriched swap event to JSON: " + e.getMessage();
} }
}) })
.print("Enriched Swap: "); .print("Fully Enriched Swap Event: ");
env.execute("Ethereum Block Stream"); env.execute("Ethereum Block Stream");
} }

View File

@@ -1,7 +0,0 @@
package stream.contract;
public class ArbitrumOne {
static public final int CHAIN_ID = 42161;
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
}

View File

@@ -1,263 +0,0 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 4.1.1.
*/
public class ERC20 extends Contract {
private static final String BINARY = "Bin file was not provided";
public static final String FUNC_NAME = "name";
public static final String FUNC_APPROVE = "approve";
public static final String FUNC_TOTALSUPPLY = "totalSupply";
public static final String FUNC_TRANSFERFROM = "transferFrom";
public static final String FUNC_DECIMALS = "decimals";
public static final String FUNC_BALANCEOF = "balanceOf";
public static final String FUNC_SYMBOL = "symbol";
public static final String FUNC_TRANSFER = "transfer";
public static final String FUNC_ALLOWANCE = "allowance";
public static final Event TRANSFER_EVENT = new Event("Transfer",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
public static final Event APPROVAL_EVENT = new Event("Approval",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public RemoteCall<String> name() {
final Function function = new Function(FUNC_NAME,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
final Function function = new Function(
FUNC_APPROVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> totalSupply() {
final Function function = new Function(FUNC_TOTALSUPPLY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFERFROM,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> decimals() {
final Function function = new Function(FUNC_DECIMALS,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<BigInteger> balanceOf(String _owner) {
final Function function = new Function(FUNC_BALANCEOF,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<String> symbol() {
final Function function = new Function(FUNC_SYMBOL,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFER,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
final Function function = new Function(FUNC_ALLOWANCE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
new org.web3j.abi.datatypes.Address(_spender)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
@Override
public TransferEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = log;
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
return transferEventFlowable(filter);
}
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
@Override
public ApprovalEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = log;
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
return approvalEventFlowable(filter);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
}
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class TransferEventResponse {
public Log log;
public String _from;
public String _to;
public BigInteger _value;
}
public static class ApprovalEventResponse {
public Log log;
public String _owner;
public String _spender;
public BigInteger _value;
}
}

View File

@@ -1,965 +0,0 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.DynamicArray;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.generated.Int128;
import org.web3j.abi.datatypes.generated.Int24;
import org.web3j.abi.datatypes.generated.Int256;
import org.web3j.abi.datatypes.generated.Int56;
import org.web3j.abi.datatypes.generated.Uint128;
import org.web3j.abi.datatypes.generated.Uint16;
import org.web3j.abi.datatypes.generated.Uint160;
import org.web3j.abi.datatypes.generated.Uint24;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint32;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteFunctionCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.BaseEventResponse;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.tuples.generated.Tuple3;
import org.web3j.tuples.generated.Tuple4;
import org.web3j.tuples.generated.Tuple5;
import org.web3j.tuples.generated.Tuple7;
import org.web3j.tuples.generated.Tuple8;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/LFDT-web3j/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 1.7.0.
*/
@SuppressWarnings("rawtypes")
public class UniswapV3Pool extends Contract {
public static final String BINARY = "Bin file was not provided";
public static final String FUNC_BURN = "burn";
public static final String FUNC_COLLECT = "collect";
public static final String FUNC_COLLECTPROTOCOL = "collectProtocol";
public static final String FUNC_FACTORY = "factory";
public static final String FUNC_FEE = "fee";
public static final String FUNC_FEEGROWTHGLOBAL0X128 = "feeGrowthGlobal0X128";
public static final String FUNC_FEEGROWTHGLOBAL1X128 = "feeGrowthGlobal1X128";
public static final String FUNC_FLASH = "flash";
public static final String FUNC_INCREASEOBSERVATIONCARDINALITYNEXT = "increaseObservationCardinalityNext";
public static final String FUNC_INITIALIZE = "initialize";
public static final String FUNC_LIQUIDITY = "liquidity";
public static final String FUNC_MAXLIQUIDITYPERTICK = "maxLiquidityPerTick";
public static final String FUNC_MINT = "mint";
public static final String FUNC_OBSERVATIONS = "observations";
public static final String FUNC_OBSERVE = "observe";
public static final String FUNC_POSITIONS = "positions";
public static final String FUNC_PROTOCOLFEES = "protocolFees";
public static final String FUNC_SETFEEPROTOCOL = "setFeeProtocol";
public static final String FUNC_SLOT0 = "slot0";
public static final String FUNC_SNAPSHOTCUMULATIVESINSIDE = "snapshotCumulativesInside";
public static final String FUNC_SWAP = "swap";
public static final String FUNC_TICKBITMAP = "tickBitmap";
public static final String FUNC_TICKSPACING = "tickSpacing";
public static final String FUNC_TICKS = "ticks";
public static final String FUNC_TOKEN0 = "token0";
public static final String FUNC_TOKEN1 = "token1";
public static final Event BURN_EVENT = new Event("Burn",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event COLLECT_EVENT = new Event("Collect",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>() {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event COLLECTPROTOCOL_EVENT = new Event("CollectProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event FLASH_EVENT = new Event("Flash",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event INCREASEOBSERVATIONCARDINALITYNEXT_EVENT = new Event("IncreaseObservationCardinalityNext",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}));
;
public static final Event INITIALIZE_EVENT = new Event("Initialize",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}));
;
public static final Event MINT_EVENT = new Event("Mint",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}, new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event SETFEEPROTOCOL_EVENT = new Event("SetFeeProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}));
;
public static final Event SWAP_EVENT = new Event("Swap",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Int256>() {}, new TypeReference<Int256>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint128>() {}, new TypeReference<Int24>() {}));
;
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice,
BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public static List<BurnEventResponse> getBurnEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(BURN_EVENT, transactionReceipt);
ArrayList<BurnEventResponse> responses = new ArrayList<BurnEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static BurnEventResponse getBurnEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(BURN_EVENT, log);
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<BurnEventResponse> burnEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getBurnEventFromLog(log));
}
public Flowable<BurnEventResponse> burnEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(BURN_EVENT));
return burnEventFlowable(filter);
}
public static List<CollectEventResponse> getCollectEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECT_EVENT, transactionReceipt);
ArrayList<CollectEventResponse> responses = new ArrayList<CollectEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectEventResponse getCollectEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECT_EVENT, log);
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<CollectEventResponse> collectEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectEventFromLog(log));
}
public Flowable<CollectEventResponse> collectEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECT_EVENT));
return collectEventFlowable(filter);
}
public static List<CollectProtocolEventResponse> getCollectProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, transactionReceipt);
ArrayList<CollectProtocolEventResponse> responses = new ArrayList<CollectProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectProtocolEventResponse getCollectProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, log);
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectProtocolEventFromLog(log));
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECTPROTOCOL_EVENT));
return collectProtocolEventFlowable(filter);
}
public static List<FlashEventResponse> getFlashEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(FLASH_EVENT, transactionReceipt);
ArrayList<FlashEventResponse> responses = new ArrayList<FlashEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static FlashEventResponse getFlashEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(FLASH_EVENT, log);
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<FlashEventResponse> flashEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getFlashEventFromLog(log));
}
public Flowable<FlashEventResponse> flashEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(FLASH_EVENT));
return flashEventFlowable(filter);
}
public static List<IncreaseObservationCardinalityNextEventResponse> getIncreaseObservationCardinalityNextEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, transactionReceipt);
ArrayList<IncreaseObservationCardinalityNextEventResponse> responses = new ArrayList<IncreaseObservationCardinalityNextEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static IncreaseObservationCardinalityNextEventResponse getIncreaseObservationCardinalityNextEventFromLog(
Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, log);
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = log;
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getIncreaseObservationCardinalityNextEventFromLog(log));
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT));
return increaseObservationCardinalityNextEventFlowable(filter);
}
public static List<InitializeEventResponse> getInitializeEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INITIALIZE_EVENT, transactionReceipt);
ArrayList<InitializeEventResponse> responses = new ArrayList<InitializeEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static InitializeEventResponse getInitializeEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INITIALIZE_EVENT, log);
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = log;
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<InitializeEventResponse> initializeEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getInitializeEventFromLog(log));
}
public Flowable<InitializeEventResponse> initializeEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INITIALIZE_EVENT));
return initializeEventFlowable(filter);
}
public static List<MintEventResponse> getMintEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(MINT_EVENT, transactionReceipt);
ArrayList<MintEventResponse> responses = new ArrayList<MintEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static MintEventResponse getMintEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(MINT_EVENT, log);
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<MintEventResponse> mintEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getMintEventFromLog(log));
}
public Flowable<MintEventResponse> mintEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(MINT_EVENT));
return mintEventFlowable(filter);
}
public static List<SetFeeProtocolEventResponse> getSetFeeProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, transactionReceipt);
ArrayList<SetFeeProtocolEventResponse> responses = new ArrayList<SetFeeProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SetFeeProtocolEventResponse getSetFeeProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, log);
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = log;
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSetFeeProtocolEventFromLog(log));
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SETFEEPROTOCOL_EVENT));
return setFeeProtocolEventFlowable(filter);
}
public static List<SwapEventResponse> getSwapEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SWAP_EVENT, transactionReceipt);
ArrayList<SwapEventResponse> responses = new ArrayList<SwapEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SwapEventResponse getSwapEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SWAP_EVENT, log);
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
return typedResponse;
}
public Flowable<SwapEventResponse> swapEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSwapEventFromLog(log));
}
public Flowable<SwapEventResponse> swapEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SWAP_EVENT));
return swapEventFlowable(filter);
}
public RemoteFunctionCall<TransactionReceipt> burn(BigInteger tickLower, BigInteger tickUpper,
BigInteger amount) {
final Function function = new Function(
FUNC_BURN,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collect(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collectProtocol(String recipient,
BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECTPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<String> factory() {
final Function function = new Function(FUNC_FACTORY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<BigInteger> fee() {
final Function function = new Function(FUNC_FEE,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal0X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL0X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal1X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL1X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> flash(String recipient, BigInteger amount0,
BigInteger amount1, byte[] data) {
final Function function = new Function(
FUNC_FLASH,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint256(amount0),
new org.web3j.abi.datatypes.generated.Uint256(amount1),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> increaseObservationCardinalityNext(
BigInteger observationCardinalityNext) {
final Function function = new Function(
FUNC_INCREASEOBSERVATIONCARDINALITYNEXT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint16(observationCardinalityNext)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> initialize(BigInteger sqrtPriceX96) {
final Function function = new Function(
FUNC_INITIALIZE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceX96)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> liquidity() {
final Function function = new Function(FUNC_LIQUIDITY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> maxLiquidityPerTick() {
final Function function = new Function(FUNC_MAXLIQUIDITYPERTICK,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> mint(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount, byte[] data) {
final Function function = new Function(
FUNC_MINT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>> observations(
BigInteger param0) {
final Function function = new Function(FUNC_OBSERVATIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint256(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint32>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple4<BigInteger, BigInteger, BigInteger, Boolean> call() throws
Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple4<BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(Boolean) results.get(3).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>> observe(
List<BigInteger> secondsAgos) {
final Function function = new Function(FUNC_OBSERVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.DynamicArray<org.web3j.abi.datatypes.generated.Uint32>(
org.web3j.abi.datatypes.generated.Uint32.class,
org.web3j.abi.Utils.typeMap(secondsAgos, org.web3j.abi.datatypes.generated.Uint32.class))),
Arrays.<TypeReference<?>>asList(new TypeReference<DynamicArray<Int56>>() {}, new TypeReference<DynamicArray<Uint160>>() {}));
return new RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>>(function,
new Callable<Tuple2<List<BigInteger>, List<BigInteger>>>() {
@Override
public Tuple2<List<BigInteger>, List<BigInteger>> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<List<BigInteger>, List<BigInteger>>(
convertToNative((List<Int56>) results.get(0).getValue()),
convertToNative((List<Uint160>) results.get(1).getValue()));
}
});
}
public RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>> positions(
byte[] param0) {
final Function function = new Function(FUNC_POSITIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Bytes32(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger> call()
throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<BigInteger, BigInteger>> protocolFees() {
final Function function = new Function(FUNC_PROTOCOLFEES,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple2<BigInteger, BigInteger>>(function,
new Callable<Tuple2<BigInteger, BigInteger>>() {
@Override
public Tuple2<BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> setFeeProtocol(BigInteger feeProtocol0,
BigInteger feeProtocol1) {
final Function function = new Function(
FUNC_SETFEEPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint8(feeProtocol0),
new org.web3j.abi.datatypes.generated.Uint8(feeProtocol1)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> slot0(
) {
final Function function = new Function(FUNC_SLOT0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint8>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(Boolean) results.get(6).getValue());
}
});
}
public RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>> snapshotCumulativesInside(
BigInteger tickLower, BigInteger tickUpper) {
final Function function = new Function(FUNC_SNAPSHOTCUMULATIVESINSIDE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper)),
Arrays.<TypeReference<?>>asList(new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}));
return new RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple3<BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple3<BigInteger, BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple3<BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> swap(String recipient, Boolean zeroForOne,
BigInteger amountSpecified, BigInteger sqrtPriceLimitX96, byte[] data) {
final Function function = new Function(
FUNC_SWAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.Bool(zeroForOne),
new org.web3j.abi.datatypes.generated.Int256(amountSpecified),
new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceLimitX96),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> tickBitmap(BigInteger param0) {
final Function function = new Function(FUNC_TICKBITMAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int16(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> tickSpacing() {
final Function function = new Function(FUNC_TICKSPACING,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Int24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> ticks(
BigInteger param0) {
final Function function = new Function(FUNC_TICKS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Int128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(BigInteger) results.get(6).getValue(),
(Boolean) results.get(7).getValue());
}
});
}
public RemoteFunctionCall<String> token0() {
final Function function = new Function(FUNC_TOKEN0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<String> token1() {
final Function function = new Function(FUNC_TOKEN1,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, credentials, contractGasProvider);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class BurnEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectProtocolEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class FlashEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger paid0;
public BigInteger paid1;
}
public static class IncreaseObservationCardinalityNextEventResponse extends BaseEventResponse {
public BigInteger observationCardinalityNextOld;
public BigInteger observationCardinalityNextNew;
}
public static class InitializeEventResponse extends BaseEventResponse {
public BigInteger sqrtPriceX96;
public BigInteger tick;
}
public static class MintEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String sender;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class SetFeeProtocolEventResponse extends BaseEventResponse {
public BigInteger feeProtocol0Old;
public BigInteger feeProtocol1Old;
public BigInteger feeProtocol0New;
public BigInteger feeProtocol1New;
}
public static class SwapEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger sqrtPriceX96;
public BigInteger liquidity;
public BigInteger tick;
}
}

View File

@@ -2,11 +2,4 @@ package stream.dto;
public class AddressId extends ChainId { public class AddressId extends ChainId {
public String address; public String address;
public AddressId() {}
public AddressId(int chainId, String address) {
super(chainId);
this.address = address;
}
} }

View File

@@ -12,9 +12,4 @@ public class BlockHash extends BlockId {
public Object getId() { public Object getId() {
return this.hash; return this.hash;
} }
@Override
public String toString() {
return this.hash;
}
} }

View File

@@ -12,9 +12,4 @@ public class BlockNumber extends BlockId {
public Object getId() { public Object getId() {
return this.number; return this.number;
} }
@Override
public String toString() {
return String.valueOf(this.number);
}
} }

View File

@@ -4,10 +4,4 @@ import java.io.Serializable;
public class ChainId implements Serializable { public class ChainId implements Serializable {
public int chainId; public int chainId;
public ChainId() {}
public ChainId(int chainId) {
this.chainId = chainId;
}
} }

View File

@@ -7,8 +7,6 @@ import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.Type; import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String; import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.*; import org.web3j.abi.datatypes.generated.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serial; import java.io.Serial;
import java.io.Serializable; import java.io.Serializable;
@@ -22,8 +20,6 @@ import static stream.io.EthUtils.keccak;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public class ElaboratedEventType<T extends EventLog> extends EventType { public class ElaboratedEventType<T extends EventLog> extends EventType {
private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class);
public transient String name; public transient String name;
public transient List<String> paramNames; public transient List<String> paramNames;
@@ -67,9 +63,7 @@ public class ElaboratedEventType<T extends EventLog> extends EventType {
int dataValuesIndex = 0; int dataValuesIndex = 0;
int topicsIndex = 1; // the first topic is the event signature int topicsIndex = 1; // the first topic is the event signature
List<Object> args = new ArrayList<>(); List<Object> args = new ArrayList<>();
for (int i = 0; i < this.paramTypes.size(); i++) { for (TypeReference<Type> paramType : this.paramTypes) {
TypeReference<Type> paramType = this.paramTypes.get(i);
String paramName = this.paramNames.get(i);
Object value; Object value;
if (paramType.isIndexed()) { if (paramType.isIndexed()) {
String encoded = topics.get(topicsIndex++); String encoded = topics.get(topicsIndex++);

View File

@@ -0,0 +1,38 @@
package stream.dto;
import java.io.Serializable;
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
// Token metadata for token0
public Token token0Metadata;
// Token metadata for token1
public Token token1Metadata;
public EnrichedSwapEventLog() {
super();
}
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
// Copy all base swap event fields
enriched.address = swapEvent.address;
enriched.sender = swapEvent.sender;
enriched.recipient = swapEvent.recipient;
enriched.amount0 = swapEvent.amount0;
enriched.amount1 = swapEvent.amount1;
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
enriched.liquidity = swapEvent.liquidity;
enriched.tick = swapEvent.tick;
enriched.token0 = swapEvent.token0;
enriched.token1 = swapEvent.token1;
// Copy EventLog fields
enriched.blockNumber = swapEvent.blockNumber;
enriched.transactionHash = swapEvent.transactionHash;
return enriched;
}
}

View File

@@ -1,6 +1,6 @@
package stream.dto; package stream.dto;
public enum Exchange { enum Exchange {
UNISWAP_V2, UNISWAP_V2,
UNISWAP_V3, UNISWAP_V3,
UNISWAP_V4, UNISWAP_V4,

View File

@@ -1,56 +0,0 @@
package stream.dto;
import java.math.BigDecimal;
public class OHLCCandle {
private final String pool;
private final String token0;
private final String token1;
private final long windowStart;
private final long windowEnd;
private final BigDecimal open;
private final BigDecimal high;
private final BigDecimal low;
private final BigDecimal close;
private final BigDecimal volume;
private final int tradeCount;
public OHLCCandle(String pool, String token0, String token1,
long windowStart, long windowEnd,
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
BigDecimal volume, int tradeCount) {
this.pool = pool;
this.token0 = token0;
this.token1 = token1;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.open = open;
this.high = high;
this.low = low;
this.close = close;
this.volume = volume;
this.tradeCount = tradeCount;
}
// Getters
public String getPool() { return pool; }
public String getToken0() { return token0; }
public String getToken1() { return token1; }
public long getWindowStart() { return windowStart; }
public long getWindowEnd() { return windowEnd; }
public BigDecimal getOpen() { return open; }
public BigDecimal getHigh() { return high; }
public BigDecimal getLow() { return low; }
public BigDecimal getClose() { return close; }
public BigDecimal getVolume() { return volume; }
public int getTradeCount() { return tradeCount; }
@Override
public String toString() {
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
pool.substring(0, 8) + "...",
token0.substring(0, 6), token1.substring(0, 6),
windowStart, windowEnd,
open, high, low, close, volume, tradeCount);
}
}

View File

@@ -8,35 +8,11 @@ public class Swap extends ChainId {
@Serial @Serial
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public Long time; // milliseconds Long time; // milliseconds
public Exchange exchange; Exchange exchange;
public String pool; // address String pool; // address
public String takerAsset; // token address String takerAsset; // token address
public String makerAsset; // token address String makerAsset; // token address
public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold. BigDecimal amount; // positive means the taker bought; negative means the taker sold.
public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold. BigDecimal price;
public BigDecimal price;
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) {
super(chainId);
this.time = time;
this.exchange = exchange;
this.pool = pool;
this.takerAsset = takerAsset;
this.makerAsset = makerAsset;
this.amountIn = amountIn;
this.amountOut = amountOut;
this.price = price; // Total trade value
}
// Getter methods
public Long getTime() { return time; }
public Exchange getExchange() { return exchange; }
public String getPool() { return pool; }
public String getTakerAsset() { return takerAsset; }
public String getMakerAsset() { return makerAsset; }
public BigDecimal getAmountIn() { return amountIn; }
public BigDecimal getAmountOut() { return amountOut; }
public BigDecimal getPrice() { return price; }
} }

View File

@@ -29,13 +29,13 @@ public class SwapEventLog extends EventLog implements Serializable {
public String sender; public String sender;
public String recipient; public String recipient;
@BigInt
public BigInteger amount0; public BigInteger amount0;
@BigInt
public BigInteger amount1; public BigInteger amount1;
@BigInt
public BigInteger sqrtPriceX96; public BigInteger sqrtPriceX96;
@BigInt
public BigInteger liquidity; public BigInteger liquidity;
public int tick; public int tick;
public String getAddress() {
return this.address;
}
} }

View File

@@ -1,59 +0,0 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapEventWithTokenIds implements Serializable {
private SwapEventLog swapEvent;
private String token0Address;
private String token1Address;
private BigInteger timestamp;
public SwapEventWithTokenIds() {
}
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
}
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventLog swapEvent) {
this.swapEvent = swapEvent;
}
public String getToken0Address() {
return token0Address;
}
public void setToken0Address(String token0Address) {
this.token0Address = token0Address;
}
public String getToken1Address() {
return token1Address;
}
public void setToken1Address(String token1Address) {
this.token1Address = token1Address;
}
public BigInteger getTimestamp() {
return timestamp;
}
public void setTimestamp(BigInteger timestamp) {
this.timestamp = timestamp;
}
}

View File

@@ -1,42 +0,0 @@
package stream.dto;
import java.io.Serializable;
public class SwapEventWithTokenMetadata implements Serializable {
private SwapEventWithTokenIds swapEvent;
private Token token0;
private Token token1;
public SwapEventWithTokenMetadata() {
}
public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) {
this.swapEvent = swapEvent;
this.token0 = token0;
this.token1 = token1;
}
public SwapEventWithTokenIds getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventWithTokenIds swapEvent) {
this.swapEvent = swapEvent;
}
public Token getToken0() {
return token0;
}
public void setToken0(Token token0) {
this.token0 = token0;
}
public Token getToken1() {
return token1;
}
public void setToken1(Token token1) {
this.token1 = token1;
}
}

View File

@@ -1,24 +0,0 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapWithTimestamp implements Serializable {
private static final long serialVersionUID = 1L;
private final SwapEventLog swapEvent;
private final BigInteger timestamp;
public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public BigInteger getTimestamp() {
return timestamp;
}
}

View File

@@ -6,22 +6,31 @@ public class Token extends AddressId {
@Serial @Serial
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String name; private String name;
public String symbol; private String symbol;
public int decimals; private int decimals;
@SuppressWarnings("unused") public String getName() {
public Token() {} return name;
}
public Token(int chainId, String address, String name, String symbol, int decimals) {
super(chainId, address); public void setName(String name) {
this.name = name; this.name = name;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol; this.symbol = symbol;
}
public int getDecimals() {
return decimals;
}
public void setDecimals(int decimals) {
this.decimals = decimals; this.decimals = decimals;
} }
@Override
public String toString() {
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
}
} }

View File

@@ -1,69 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.BlockId;
import stream.dto.BlockNumber;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
private transient Web3j w3;
private static final AtomicInteger activeOperations = new AtomicInteger(0);
private static final AtomicInteger totalOperations = new AtomicInteger(0);
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
}
@Override
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
int active = activeOperations.incrementAndGet();
int total = totalOperations.incrementAndGet();
if (total % 100 == 0) {
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
}
CompletableFuture.supplyAsync(() -> {
try {
return getBlock(blockId);
} catch (Exception e) {
log.error("Failed to get block {} on chain {}", blockId, e);
throw new RuntimeException("Error processing block " + blockId, e);
}
}).thenAccept(result -> {
activeOperations.decrementAndGet();
resultFuture.complete(Collections.singleton(result));
}).exceptionally(throwable -> {
activeOperations.decrementAndGet();
resultFuture.completeExceptionally(throwable);
return null;
});
}
private EthBlock getBlock(BlockId id) {
try {
return (id instanceof BlockNumber) ?
w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() :
w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,80 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.SwapEventLog;
import stream.dto.SwapWithTimestamp;
import java.math.BigInteger;
import java.util.Collections;
public class BlockTimestampElaborator extends RichAsyncFunction<SwapEventLog, SwapWithTimestamp> {
private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class);
private transient BlockElaborator blockElaborator;
@Override
public void open(OpenContext openContext) throws Exception {
blockElaborator = new BlockElaborator();
blockElaborator.setRuntimeContext(getRuntimeContext());
blockElaborator.open(openContext);
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<SwapWithTimestamp> resultFuture) {
try {
BlockHash blockHash = new BlockHash();
blockHash.hash = swapEvent.blockHash;
// Create a custom ResultFuture to handle the block elaboration result
ResultFuture<EthBlock> blockResultFuture = new ResultFuture<EthBlock>() {
@Override
public void complete(java.util.Collection<EthBlock> result) {
try {
if (!result.isEmpty()) {
EthBlock ethBlock = result.iterator().next();
BigInteger timestamp = ethBlock.getBlock().getTimestamp();
SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp);
resultFuture.complete(Collections.singleton(swapWithTimestamp));
} else {
log.error("No block found for block hash {}", swapEvent.blockHash);
resultFuture.completeExceptionally(new RuntimeException("No block found"));
}
} catch (Exception e) {
log.error("Failed to get timestamp for block {} of swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, e);
resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e));
}
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<EthBlock> collectionSupplier) {
try {
complete(collectionSupplier.get());
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
log.error("Failed to get block {} for swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, error);
resultFuture.completeExceptionally(error);
}
};
// Invoke the block elaborator asynchronously
blockElaborator.asyncInvoke(blockHash, blockResultFuture);
} catch (Exception e) {
log.error("Error in BlockTimestampElaborator for swap in tx {}",
swapEvent.transactionHash, e);
resultFuture.completeExceptionally(e);
}
}
}

View File

@@ -1,9 +1,7 @@
package stream.io; package stream.io;
import java.util.HexFormat;
import org.bouncycastle.jcajce.provider.digest.Keccak; import org.bouncycastle.jcajce.provider.digest.Keccak;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class EthUtils { public class EthUtils {
public static boolean isValidAddress(String address) { public static boolean isValidAddress(String address) {
@@ -37,6 +35,6 @@ public class EthUtils {
public static String keccak(String message) { public static String keccak(String message) {
byte[] hash = new Keccak.Digest256().digest(message.getBytes()); byte[] hash = new Keccak.Digest256().digest(message.getBytes());
return "0x" + HexFormat.of().formatHex(hash); return "0x"+ByteUtils.toHexString(hash);
} }
} }

View File

@@ -1,34 +0,0 @@
package stream.io;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.JedisPooled;
import java.time.Duration;
import java.util.Map;
public class JedisClient {
protected static volatile JedisPooled jedis = null;
private static final Object jedisLock = new Object();
static public JedisPooled get(Map<String, String> params) {
if (jedis == null) {
synchronized (jedisLock) {
String url = params.getOrDefault("redis_url", "http://localhost:6379");
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(connections);
poolConfig.setMaxIdle(connections);
poolConfig.setMinIdle(0);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWait(Duration.ofSeconds(5));
// Enables sending a PING command periodically while the connection is idle.
poolConfig.setTestWhileIdle(true);
// controls the period between checks for idle connections in the pool
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
jedis = new JedisPooled(poolConfig, url);
}
}
return jedis;
}
}

View File

@@ -0,0 +1,52 @@
package stream.io;
public class PoolTokenFetcher {
// Uniswap V3 pool method signatures
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
private final JsonRpcClient jsonRpcClient;
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
this.jsonRpcClient = jsonRpcClient;
}
public String fetchToken0(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
public String fetchToken1(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
private String decodeAddress(String hexResult) {
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
return null;
}
// Address is the last 20 bytes (40 hex chars) of the result
// Remove 0x prefix and get last 40 characters, then add 0x back
String hex = hexResult.substring(2);
if (hex.length() >= 40) {
return "0x" + hex.substring(hex.length() - 40);
}
return hexResult; // Return as-is if unexpected format
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -1,81 +0,0 @@
package stream.io;
import stream.dto.SwapWithTimestamp;
import stream.dto.SwapEventWithTokenIds;
import stream.contract.UniswapV3Pool;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.web3j.crypto.Credentials;
import org.web3j.tx.gas.DefaultGasProvider;
import java.util.concurrent.CompletableFuture;
import java.util.Collections;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class PoolTokenIdElaborator extends RichAsyncFunction<SwapWithTimestamp, SwapEventWithTokenIds> {
private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class);
private transient Web3j web3j;
private transient Credentials credentials;
private transient DefaultGasProvider gasProvider;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
// Get RPC URL from job parameters
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
// TODO: Get from configuration if needed
// Initialize Web3j
this.web3j = Web3j.build(new HttpService(rpcUrl));
// Dummy credentials for read-only operations
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
// Default gas provider
this.gasProvider = new DefaultGasProvider();
}
@Override
public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture<SwapEventWithTokenIds> resultFuture) throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
// Load the pool contract
UniswapV3Pool pool = UniswapV3Pool.load(
swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event
web3j,
credentials,
gasProvider
);
// Get token addresses
String token0 = pool.token0().send();
String token1 = pool.token1().send();
// Create enriched event with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp());
} catch (Exception e) {
log.error("Error fetching pool tokens for swap in pool {}",
swapWithTimestamp.getSwapEvent().getAddress(), e);
// Return original without enrichment but with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp());
}
}).thenAccept(enriched -> {
resultFuture.complete(Collections.singletonList(enriched));
});
}
@Override
public void close() throws Exception {
if (web3j != null) {
web3j.shutdown();
}
}
}

View File

@@ -1,89 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.math.RoundingMode;
public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata, Swap> {
private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class);
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
@Override
public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception {
SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent();
SwapEventLog swapLog = event.getSwapEvent();
Token token0 = swapWithMetadata.getToken0();
Token token1 = swapWithMetadata.getToken1();
// Use timestamp from block elaboration and set exchange as UNISWAP_V3
Long time = event.getTimestamp().longValue();
Exchange exchange = Exchange.UNISWAP_V3;
// Pool address
String pool = swapLog.address;
// Get sqrtPriceX96 and calculate actual price
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
// Adjust price for decimals (price is token1/token0)
int decimalDiff = token0.decimals - token1.decimals;
BigDecimal adjustedPrice;
if (decimalDiff >= 0) {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
} else {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
}
// Determine which token is in and which is out based on amount signs
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
String takerAsset;
String makerAsset;
BigDecimal amountIn;
BigDecimal amountOut;
BigDecimal finalPrice;
if (isToken0In) {
// User is sending token0, receiving token1
takerAsset = event.getToken0Address();
makerAsset = event.getToken1Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount0.abs())
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount1)
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
// Price is how much token1 you get per token0
finalPrice = adjustedPrice;
} else {
// User is sending token1, receiving token0
takerAsset = event.getToken1Address();
makerAsset = event.getToken0Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount1.abs())
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount0)
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
// Currently adjustedPrice = token1/token0, so we keep it as is
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
finalPrice = adjustedPrice;
}
// Pass both amountIn and amountOut to constructor with unit price
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
}
}

View File

@@ -0,0 +1,161 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.EnrichedSwapEventLog;
import stream.dto.SwapEventLog;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
private transient JsonRpcClient jsonRpcClient;
private transient PoolTokenFetcher poolTokenFetcher;
private transient TokenMetadataFetcher tokenMetadataFetcher;
private transient TokenCache tokenCache;
private final String rpcUrl;
public SwapEventEnricher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
this.tokenMetadataFetcher.open(openContext);
this.tokenCache = new TokenCache();
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
try {
// First, get token addresses if not already populated
CompletableFuture<Void> tokenAddressFuture;
if (swapEvent.token0 == null || swapEvent.token1 == null) {
tokenAddressFuture = CompletableFuture.runAsync(() -> {
try {
if (swapEvent.token0 == null) {
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
}
if (swapEvent.token1 == null) {
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
}
} catch (Throwable e) {
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
}
});
} else {
tokenAddressFuture = CompletableFuture.completedFuture(null);
}
// Then fetch metadata for both tokens
tokenAddressFuture.thenCompose(v -> {
if (swapEvent.token0 == null || swapEvent.token1 == null) {
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
}
// Fetch metadata for both tokens concurrently (with caching)
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
.thenApply(ignored -> {
try {
Token token0Meta = token0MetaFuture.get();
Token token1Meta = token1MetaFuture.get();
// Create enriched event with token metadata
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
// Set token metadata objects
if (token0Meta != null) {
enriched.token0Metadata = token0Meta;
// Also set the address in the Token object
token0Meta.address = swapEvent.token0;
}
if (token1Meta != null) {
enriched.token1Metadata = token1Meta;
// Also set the address in the Token object
token1Meta.address = swapEvent.token1;
}
return enriched;
} catch (Exception e) {
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
}
});
})
.thenAccept(enrichedEvent -> {
resultFuture.complete(Collections.singleton(enrichedEvent));
})
.exceptionally(throwable -> {
System.err.println("Error enriching swap event: " + throwable.getMessage());
throwable.printStackTrace();
resultFuture.completeExceptionally(throwable);
return null;
});
// Log cache stats periodically (every 100 events)
if (System.currentTimeMillis() % 10000 < 100) {
System.out.println("Token cache stats: " + tokenCache.getStats());
}
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private static class MockResultFuture implements ResultFuture<Token> {
private Token result;
private Throwable error;
private boolean complete = false;
@Override
public void complete(java.util.Collection<Token> results) {
if (!results.isEmpty()) {
this.result = results.iterator().next();
}
this.complete = true;
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
this.error = error;
this.complete = true;
}
public boolean isComplete() {
return complete;
}
public boolean hasError() {
return error != null;
}
public Token getResult() {
return result;
}
public Throwable getError() {
return error;
}
}
}

View File

@@ -0,0 +1,142 @@
package stream.io;
import stream.dto.Token;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
/**
* Thread-safe cache for token metadata to avoid repeated RPC calls
* for the same token addresses.
*/
public class TokenCache {
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
/**
* Get token from cache if present, otherwise return null
*/
public Token get(String tokenAddress) {
return cache.get(tokenAddress.toLowerCase());
}
/**
* Put token in cache
*/
public void put(String tokenAddress, Token token) {
if (token != null && tokenAddress != null) {
cache.put(tokenAddress.toLowerCase(), token);
}
}
/**
* Check if token is already in cache
*/
public boolean contains(String tokenAddress) {
return cache.containsKey(tokenAddress.toLowerCase());
}
/**
* Get or fetch token metadata with deduplication.
* If multiple requests come for the same token simultaneously,
* only one will fetch and others will wait for the result.
*/
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
String normalizedAddress = tokenAddress.toLowerCase();
// Check cache first
Token cachedToken = cache.get(normalizedAddress);
if (cachedToken != null) {
return CompletableFuture.completedFuture(cachedToken);
}
// Check if there's already a pending request for this token
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
if (pendingFuture != null) {
return pendingFuture;
}
// Create new request
CompletableFuture<Token> future = new CompletableFuture<>();
// Register the pending request
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
if (existingFuture != null) {
// Another thread beat us to it, return their future
return existingFuture;
}
// We're the first, so fetch the token metadata
try {
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
@Override
public void complete(java.util.Collection<Token> results) {
Token token = results.isEmpty() ? null : results.iterator().next();
// Cache the result (even if null to avoid repeated failures)
if (token != null) {
cache.put(normalizedAddress, token);
}
// Complete the future
future.complete(token);
// Remove from pending requests
pendingRequests.remove(normalizedAddress);
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
future.completeExceptionally(error);
pendingRequests.remove(normalizedAddress);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
pendingRequests.remove(normalizedAddress);
}
return future;
}
/**
* Get cache statistics for monitoring
*/
public CacheStats getStats() {
return new CacheStats(cache.size(), pendingRequests.size());
}
/**
* Clear all cached entries (useful for testing or memory management)
*/
public void clear() {
cache.clear();
// Note: We don't clear pending requests as they're in flight
}
public static class CacheStats {
public final int cachedTokens;
public final int pendingRequests;
public CacheStats(int cachedTokens, int pendingRequests) {
this.cachedTokens = cachedTokens;
this.pendingRequests = pendingRequests;
}
@Override
public String toString() {
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
}
}
}

View File

@@ -1,66 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.tx.ReadonlyTransactionManager;
import org.web3j.tx.gas.DefaultGasProvider;
import stream.contract.ERC20;
import stream.dto.AddressId;
import stream.dto.Token;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
private transient Web3j w3;
private DefaultGasProvider gasProvider;
private ReadonlyTransactionManager transactionManager;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
gasProvider = new DefaultGasProvider();
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
}
@Override
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
CompletableFuture.supplyAsync(() -> {
try {
return processToken(address);
} catch (Exception e) {
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
throw new RuntimeException("Error processing token: " + address, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private Token processToken(AddressId address) {
String name;
String symbol;
int decimals;
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
try {
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
CompletableFuture<String> nameAsync = contract.name().sendAsync();
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
decimals = decimalsAsync.get().intValue();
name = nameAsync.get();
symbol = symbolAsync.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return new Token(address.chainId, address.address, name, symbol, decimals);
}
}

View File

@@ -0,0 +1,166 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
// ERC-20 method signatures
private static final String NAME_SIGNATURE = "0x06fdde03";
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
private static final String DECIMALS_SIGNATURE = "0x313ce567";
private static final String LATEST_BLOCK = "latest";
private transient JsonRpcClient jsonRpcClient;
private final String rpcUrl;
public TokenMetadataFetcher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
}
@Override
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
try {
// Fetch all metadata concurrently
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchName(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchSymbol(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchDecimals(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
// Combine all results
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
.thenAccept(v -> {
try {
Token token = new Token();
token.setName(decodeString(nameFuture.get()));
token.setSymbol(decodeString(symbolFuture.get()));
token.setDecimals(decodeUint8(decimalsFuture.get()));
resultFuture.complete(Collections.singleton(token));
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
})
.exceptionally(throwable -> {
resultFuture.completeExceptionally(throwable);
return null;
});
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private String fetchName(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, NAME_SIGNATURE);
}
private String fetchSymbol(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
}
private String fetchDecimals(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
}
private String makeEthCall(String toAddress, String data) throws Throwable {
EthCallRequest request = new EthCallRequest(toAddress, data);
Object[] params = new Object[]{request, LATEST_BLOCK};
return jsonRpcClient.invoke("eth_call", params, String.class);
}
private String decodeString(String hexData) {
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
return "Unknown";
}
try {
// Remove 0x prefix
String hex = hexData.substring(2);
// Skip the first 32 bytes (offset pointer)
// The actual string length is at bytes 32-64
String lengthHex = hex.substring(64, 128);
int length = Integer.parseInt(lengthHex, 16);
if (length <= 0 || length > 100) { // Sanity check
return "Invalid";
}
// Extract the actual string data starting at byte 64
String dataHex = hex.substring(128, 128 + (length * 2));
// Convert hex to string
StringBuilder result = new StringBuilder();
for (int i = 0; i < dataHex.length(); i += 2) {
String hexChar = dataHex.substring(i, i + 2);
int charCode = Integer.parseInt(hexChar, 16);
if (charCode > 0) { // Skip null bytes
result.append((char) charCode);
}
}
return result.toString();
} catch (Exception e) {
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
return "DecodeError";
}
}
private int decodeUint8(String hexData) {
if (hexData == null || hexData.equals("0x")) {
return 0;
}
try {
// Remove 0x prefix and get last byte (uint8)
String hex = hexData.substring(2);
// uint8 is the last 2 hex characters (1 byte)
String uint8Hex = hex.substring(hex.length() - 2);
return Integer.parseInt(uint8Hex, 16);
} catch (Exception e) {
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
return 0;
}
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -1,50 +0,0 @@
package stream.io;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.config.StreamingDefaults;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.MODERN_TLS;
public class Web3Client {
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
private static volatile Web3j w3 = null;
private static final Object w3Lock = new Object();
static public Web3j get(Map<String, String> params) {
if (w3==null) {
synchronized (w3Lock) {
log.info("Initializing Web3 client");
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
Duration timeout = Duration.ofSeconds(30);
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
url, maxIdleConnections, keepAlive, timeout.getSeconds());
var httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
.connectionPool(connectionPool)
.build();
w3 = Web3j.build(new HttpService(url, httpClient));
log.info("Web3 client initialized successfully");
}
}
return w3;
}
}

View File

@@ -1,41 +0,0 @@
package stream.ohlc;
import java.math.BigDecimal;
public class OHLCAccumulator {
public String pool = null;
public String token0 = null; // Added
public String token1 = null; // Added
public BigDecimal open = null;
public BigDecimal high = null;
public BigDecimal low = null;
public BigDecimal close = null;
public BigDecimal volume = BigDecimal.ZERO;
public int tradeCount = 0;
public long firstTradeTime = 0;
public long lastTradeTime = 0;
public OHLCAccumulator() {}
public void updatePrice(BigDecimal price, long timestamp) {
if (open == null) {
open = price;
high = price;
low = price;
firstTradeTime = timestamp;
} else {
high = high.max(price);
low = low.min(price);
}
close = price;
lastTradeTime = timestamp;
}
public void addVolume(BigDecimal amount) {
volume = volume.add(amount);
}
public void incrementTradeCount() {
tradeCount++;
}
}

View File

@@ -1,98 +0,0 @@
package stream.ohlc;
import org.apache.flink.api.common.functions.AggregateFunction;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
import java.math.BigDecimal;
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
private final long windowSize = 60; // 1 minute in seconds
@Override
public OHLCAccumulator createAccumulator() {
return new OHLCAccumulator();
}
@Override
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
// Initialize pool and tokens on first swap
if (accumulator.pool == null) {
accumulator.pool = swap.getPool();
// Store tokens in consistent order (you could sort by address)
accumulator.token0 = swap.getTakerAsset();
accumulator.token1 = swap.getMakerAsset();
}
// Update OHLC prices
accumulator.updatePrice(swap.getPrice(), swap.getTime());
// Calculate volume (using the taker's input amount as volume)
BigDecimal volume = swap.getAmountIn().abs();
accumulator.addVolume(volume);
// Increment trade count
accumulator.incrementTradeCount();
return accumulator;
}
@Override
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
OHLCAccumulator merged = new OHLCAccumulator();
// Merge pool and token info
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
// Merge OHLC data
if (acc1.open != null && acc2.open != null) {
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
merged.high = acc1.high.max(acc2.high);
merged.low = acc1.low.min(acc2.low);
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
} else if (acc1.open != null) {
merged.open = acc1.open;
merged.close = acc1.close;
merged.high = acc1.high;
merged.low = acc1.low;
merged.firstTradeTime = acc1.firstTradeTime;
merged.lastTradeTime = acc1.lastTradeTime;
} else if (acc2.open != null) {
merged.open = acc2.open;
merged.close = acc2.close;
merged.high = acc2.high;
merged.low = acc2.low;
merged.firstTradeTime = acc2.firstTradeTime;
merged.lastTradeTime = acc2.lastTradeTime;
}
merged.volume = acc1.volume.add(acc2.volume);
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
return merged;
}
@Override
public OHLCCandle getResult(OHLCAccumulator accumulator) {
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
long windowEnd = windowStart + windowSize;
return new OHLCCandle(
accumulator.pool,
accumulator.token0,
accumulator.token1,
windowStart,
windowEnd,
accumulator.open,
accumulator.high,
accumulator.low,
accumulator.close,
accumulator.volume,
accumulator.tradeCount
);
}
}

View File

@@ -1,26 +0,0 @@
package stream.ohlc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import java.time.Duration;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
public class OHLCPipeline {
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
return swapStream
// NO watermarks needed for processing-time windows
.keyBy(Swap::getPool)
// Use 1-minute processing-time windows
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
// Simply use the aggregator
.aggregate(new OHLCAggregator())
// Filter out empty windows
.filter(candle -> candle.getTradeCount() > 0);
}
}