15 Commits

Author SHA1 Message Date
255d8f126d updating stream defauls like aysnc calls, wait timeout, max idle connection and keep alive counts to ensure the stream doesn't fail. Moving everything to a config file to easy updates 2025-10-10 14:54:16 -04:00
88fce17efc OHLC pipeline stuff 2025-10-09 13:06:22 -04:00
b4f1de9d0d removing price inversion in the swap ellaborator since we always want to represent the vlaue of the swap in its quote curency 2025-10-09 13:00:07 -04:00
0d8df3df9c adding renamed elaborators 2025-10-08 13:22:50 -04:00
2fc85c688b Using timestmap from blockhash. Also renaming elaborators to be more descriptive and removing unneeded logs and code 2025-10-08 13:16:35 -04:00
47fe85b50b Swap object stream with verified amount in and amount out 2025-10-07 20:11:01 -04:00
surbhi
6b4cad1479 Remove .mvn/jvm.config file 2025-10-06 19:25:35 -04:00
04d441d6b1 Remove SSH key files from repository 2025-10-06 19:18:33 -04:00
sjhavar
00ae425700 Add token metadata logging to swap event stream
- Integrated TokenElaborator to enrich swap events with token metadata
- Added comprehensive logging showing pool, token addresses, and metadata
- Increased timeout to handle rate limiting from RPC provider
- Simplified log output removing emoji formatting
2025-10-06 16:03:53 -04:00
sjhavar
e58fdc0b06 Add PoolElaborator for token address enrichment
- Created PoolElaborator async function to retrieve token0 and token1 addresses from Uniswap v3 pools
- Added ElaboratedSwapEvent DTO to store swap events with token addresses
- Updated SwapEventLog with getAddress() method for pool contract access
- Integrated PoolElaborator into DataStreamJob pipeline for real-time token enrichment
- Configured dynamic RPC URL retrieval from job parameters
2025-10-06 16:03:53 -04:00
tim
7449b4b4b1 BlockElaborator bugfix 2025-10-06 14:41:18 -04:00
tim
6829053d94 BlockElaborator 2025-10-06 12:33:45 -04:00
tim
3a64f5630c UniswapV3Pool wrapper 2025-10-06 12:00:07 -04:00
tim
9f65c2e850 Merge remote-tracking branch 'origin/main' 2025-09-28 14:10:00 -04:00
tim
8c672f250e TokenElaborator 2025-09-28 14:09:16 -04:00
34 changed files with 2310 additions and 627 deletions

35
pom.xml
View File

@@ -35,9 +35,11 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<flink.version>2.1.0</flink.version>
<log4j.version>2.24.1</log4j.version>
<web3j.version>5.0.0</web3j.version>
<web3j.version>5.0.1</web3j.version>
<httpclient5.version>5.3.1</httpclient5.version>
<jsonrpc4j.version>1.6</jsonrpc4j.version>
<jedis.version>6.1.0</jedis.version>
<jackson.version>2.17.1</jackson.version>
</properties>
<repositories>
@@ -55,7 +57,23 @@ under the License.
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -132,7 +150,14 @@ under the License.
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
@@ -143,8 +168,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>16</source>
<target>16</target>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

View File

@@ -19,18 +19,32 @@
package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.ParameterTool;
import org.apache.flink.util.Collector;
import stream.config.StreamingDefaults;
import stream.dto.*;
import stream.io.SwapEventEnricher;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import stream.io.PoolTokenIdElaborator;
import stream.io.TokenElaborator;
import stream.io.SwapElaborator;
import stream.io.BlockTimestampElaborator;
import stream.ohlc.OHLCPipeline;
import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,43 +72,150 @@ public class DataStreamJob {
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// do not do this until considering how secrets are handled by flink
// env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
env.getConfig().setGlobalJobParameters(parameters);
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
// Async operation parameters
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
// Create ObjectMapper for pretty JSON printing
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"ArbitrumOne Head Blocks",
TypeInformation.of(ArbitrumOneBlock.class)
);
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Create SwapEnricher to add token addresses and metadata
SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
// Enrich swap events with token addresses and metadata
DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
.unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
.name("Swap Enricher");
// Add block timestamp to swap events
SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
swapStream,
new BlockTimestampElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Print fully enriched swap events with all metadata
fullyEnrichedSwapStream
.map(event -> {
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
swapWithTimestampStream,
new PoolTokenIdElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Extract token addresses and elaborate with metadata
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
.flatMap(new FlatMapFunction<SwapEventWithTokenIds, AddressId>() {
@Override
public void flatMap(SwapEventWithTokenIds event, Collector<AddressId> collector) throws Exception {
collector.collect(new AddressId(42161, event.getToken0Address()));
collector.collect(new AddressId(42161, event.getToken1Address()));
}
});
// Elaborate tokens with metadata
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
tokenAddresses,
new TokenElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
DataStream<SwapEventWithTokenMetadata> swapsWithTokens = elaboratedSwapStream
.connect(elaboratedTokens)
.flatMap(new RichCoFlatMapFunction<SwapEventWithTokenIds, Token, SwapEventWithTokenMetadata>() {
private final Map<String, Token> tokenCache = new HashMap<>();
private final Map<String, SwapEventWithTokenIds> pendingSwaps = new HashMap<>();
@Override
public void flatMap1(SwapEventWithTokenIds event, Collector<SwapEventWithTokenMetadata> out) throws Exception {
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
// We have both tokens, create SwapEventWithTokenMetadata
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
out.collect(swapWithMetadata);
} else {
// Cache the swap event until we get both tokens
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
}
}
@Override
public void flatMap2(Token token, Collector<SwapEventWithTokenMetadata> out) throws Exception {
// Cache the token
tokenCache.put(token.address.toLowerCase(), token);
// Check if any pending swaps can now be completed
Iterator<Map.Entry<String, SwapEventWithTokenIds>> iterator = pendingSwaps.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SwapEventWithTokenIds> entry = iterator.next();
SwapEventWithTokenIds event = entry.getValue();
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
out.collect(swapWithMetadata);
iterator.remove();
}
}
}
});
// Apply SwapElaborator to create final Swap objects with calculated prices
DataStream<Swap> swaps = swapsWithTokens
.map(new SwapElaborator());
// Test OHLC with USDC/WETH pool
OHLCPipeline ohlcPipeline = new OHLCPipeline();
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
// Filter and print OHLC candles for USDC/WETH pool only
allOhlc
.filter(candle -> {
// Filter for specific USDC/WETH pool
String poolAddress = candle.getPool().toLowerCase();
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
return poolAddress.equals(targetPool);
})
.map(candle -> {
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
" Trades=" + candle.getTradeCount() +
" Open=" + candle.getOpen() +
" High=" + candle.getHigh() +
" Low=" + candle.getLow() +
" Close=" + candle.getClose() +
" Volume=" + candle.getVolume());
return candle;
})
.setParallelism(1)
.returns(OHLCCandle.class)
.print("USDC-WETH-OHLC");
// Print the final enriched swap objects
swaps
.map(swap -> {
try {
return mapper.writeValueAsString(event);
String json = mapper.writeValueAsString(swap);
return json;
} catch (Exception e) {
return "Error converting enriched swap event to JSON: " + e.getMessage();
return "Error converting enriched swap to JSON: " + e.getMessage();
}
})
.print("Fully Enriched Swap Event: ");
.print("Enriched Swap: ");
env.execute("Ethereum Block Stream");
}

View File

@@ -0,0 +1,7 @@
package stream.contract;
public class ArbitrumOne {
static public final int CHAIN_ID = 42161;
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
}

View File

@@ -0,0 +1,263 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 4.1.1.
*/
public class ERC20 extends Contract {
private static final String BINARY = "Bin file was not provided";
public static final String FUNC_NAME = "name";
public static final String FUNC_APPROVE = "approve";
public static final String FUNC_TOTALSUPPLY = "totalSupply";
public static final String FUNC_TRANSFERFROM = "transferFrom";
public static final String FUNC_DECIMALS = "decimals";
public static final String FUNC_BALANCEOF = "balanceOf";
public static final String FUNC_SYMBOL = "symbol";
public static final String FUNC_TRANSFER = "transfer";
public static final String FUNC_ALLOWANCE = "allowance";
public static final Event TRANSFER_EVENT = new Event("Transfer",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
public static final Event APPROVAL_EVENT = new Event("Approval",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public RemoteCall<String> name() {
final Function function = new Function(FUNC_NAME,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
final Function function = new Function(
FUNC_APPROVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> totalSupply() {
final Function function = new Function(FUNC_TOTALSUPPLY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFERFROM,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> decimals() {
final Function function = new Function(FUNC_DECIMALS,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<BigInteger> balanceOf(String _owner) {
final Function function = new Function(FUNC_BALANCEOF,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<String> symbol() {
final Function function = new Function(FUNC_SYMBOL,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFER,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
final Function function = new Function(FUNC_ALLOWANCE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
new org.web3j.abi.datatypes.Address(_spender)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
@Override
public TransferEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = log;
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
return transferEventFlowable(filter);
}
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
@Override
public ApprovalEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = log;
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
return approvalEventFlowable(filter);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
}
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class TransferEventResponse {
public Log log;
public String _from;
public String _to;
public BigInteger _value;
}
public static class ApprovalEventResponse {
public Log log;
public String _owner;
public String _spender;
public BigInteger _value;
}
}

View File

@@ -0,0 +1,965 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.DynamicArray;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.generated.Int128;
import org.web3j.abi.datatypes.generated.Int24;
import org.web3j.abi.datatypes.generated.Int256;
import org.web3j.abi.datatypes.generated.Int56;
import org.web3j.abi.datatypes.generated.Uint128;
import org.web3j.abi.datatypes.generated.Uint16;
import org.web3j.abi.datatypes.generated.Uint160;
import org.web3j.abi.datatypes.generated.Uint24;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint32;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteFunctionCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.BaseEventResponse;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.tuples.generated.Tuple3;
import org.web3j.tuples.generated.Tuple4;
import org.web3j.tuples.generated.Tuple5;
import org.web3j.tuples.generated.Tuple7;
import org.web3j.tuples.generated.Tuple8;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/LFDT-web3j/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 1.7.0.
*/
@SuppressWarnings("rawtypes")
public class UniswapV3Pool extends Contract {
public static final String BINARY = "Bin file was not provided";
public static final String FUNC_BURN = "burn";
public static final String FUNC_COLLECT = "collect";
public static final String FUNC_COLLECTPROTOCOL = "collectProtocol";
public static final String FUNC_FACTORY = "factory";
public static final String FUNC_FEE = "fee";
public static final String FUNC_FEEGROWTHGLOBAL0X128 = "feeGrowthGlobal0X128";
public static final String FUNC_FEEGROWTHGLOBAL1X128 = "feeGrowthGlobal1X128";
public static final String FUNC_FLASH = "flash";
public static final String FUNC_INCREASEOBSERVATIONCARDINALITYNEXT = "increaseObservationCardinalityNext";
public static final String FUNC_INITIALIZE = "initialize";
public static final String FUNC_LIQUIDITY = "liquidity";
public static final String FUNC_MAXLIQUIDITYPERTICK = "maxLiquidityPerTick";
public static final String FUNC_MINT = "mint";
public static final String FUNC_OBSERVATIONS = "observations";
public static final String FUNC_OBSERVE = "observe";
public static final String FUNC_POSITIONS = "positions";
public static final String FUNC_PROTOCOLFEES = "protocolFees";
public static final String FUNC_SETFEEPROTOCOL = "setFeeProtocol";
public static final String FUNC_SLOT0 = "slot0";
public static final String FUNC_SNAPSHOTCUMULATIVESINSIDE = "snapshotCumulativesInside";
public static final String FUNC_SWAP = "swap";
public static final String FUNC_TICKBITMAP = "tickBitmap";
public static final String FUNC_TICKSPACING = "tickSpacing";
public static final String FUNC_TICKS = "ticks";
public static final String FUNC_TOKEN0 = "token0";
public static final String FUNC_TOKEN1 = "token1";
public static final Event BURN_EVENT = new Event("Burn",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event COLLECT_EVENT = new Event("Collect",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>() {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event COLLECTPROTOCOL_EVENT = new Event("CollectProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event FLASH_EVENT = new Event("Flash",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event INCREASEOBSERVATIONCARDINALITYNEXT_EVENT = new Event("IncreaseObservationCardinalityNext",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}));
;
public static final Event INITIALIZE_EVENT = new Event("Initialize",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}));
;
public static final Event MINT_EVENT = new Event("Mint",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}, new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event SETFEEPROTOCOL_EVENT = new Event("SetFeeProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}));
;
public static final Event SWAP_EVENT = new Event("Swap",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Int256>() {}, new TypeReference<Int256>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint128>() {}, new TypeReference<Int24>() {}));
;
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice,
BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public static List<BurnEventResponse> getBurnEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(BURN_EVENT, transactionReceipt);
ArrayList<BurnEventResponse> responses = new ArrayList<BurnEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static BurnEventResponse getBurnEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(BURN_EVENT, log);
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<BurnEventResponse> burnEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getBurnEventFromLog(log));
}
public Flowable<BurnEventResponse> burnEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(BURN_EVENT));
return burnEventFlowable(filter);
}
public static List<CollectEventResponse> getCollectEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECT_EVENT, transactionReceipt);
ArrayList<CollectEventResponse> responses = new ArrayList<CollectEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectEventResponse getCollectEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECT_EVENT, log);
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<CollectEventResponse> collectEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectEventFromLog(log));
}
public Flowable<CollectEventResponse> collectEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECT_EVENT));
return collectEventFlowable(filter);
}
public static List<CollectProtocolEventResponse> getCollectProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, transactionReceipt);
ArrayList<CollectProtocolEventResponse> responses = new ArrayList<CollectProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectProtocolEventResponse getCollectProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, log);
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectProtocolEventFromLog(log));
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECTPROTOCOL_EVENT));
return collectProtocolEventFlowable(filter);
}
public static List<FlashEventResponse> getFlashEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(FLASH_EVENT, transactionReceipt);
ArrayList<FlashEventResponse> responses = new ArrayList<FlashEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static FlashEventResponse getFlashEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(FLASH_EVENT, log);
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<FlashEventResponse> flashEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getFlashEventFromLog(log));
}
public Flowable<FlashEventResponse> flashEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(FLASH_EVENT));
return flashEventFlowable(filter);
}
public static List<IncreaseObservationCardinalityNextEventResponse> getIncreaseObservationCardinalityNextEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, transactionReceipt);
ArrayList<IncreaseObservationCardinalityNextEventResponse> responses = new ArrayList<IncreaseObservationCardinalityNextEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static IncreaseObservationCardinalityNextEventResponse getIncreaseObservationCardinalityNextEventFromLog(
Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, log);
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = log;
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getIncreaseObservationCardinalityNextEventFromLog(log));
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT));
return increaseObservationCardinalityNextEventFlowable(filter);
}
public static List<InitializeEventResponse> getInitializeEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INITIALIZE_EVENT, transactionReceipt);
ArrayList<InitializeEventResponse> responses = new ArrayList<InitializeEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static InitializeEventResponse getInitializeEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INITIALIZE_EVENT, log);
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = log;
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<InitializeEventResponse> initializeEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getInitializeEventFromLog(log));
}
public Flowable<InitializeEventResponse> initializeEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INITIALIZE_EVENT));
return initializeEventFlowable(filter);
}
public static List<MintEventResponse> getMintEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(MINT_EVENT, transactionReceipt);
ArrayList<MintEventResponse> responses = new ArrayList<MintEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static MintEventResponse getMintEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(MINT_EVENT, log);
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<MintEventResponse> mintEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getMintEventFromLog(log));
}
public Flowable<MintEventResponse> mintEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(MINT_EVENT));
return mintEventFlowable(filter);
}
public static List<SetFeeProtocolEventResponse> getSetFeeProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, transactionReceipt);
ArrayList<SetFeeProtocolEventResponse> responses = new ArrayList<SetFeeProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SetFeeProtocolEventResponse getSetFeeProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, log);
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = log;
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSetFeeProtocolEventFromLog(log));
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SETFEEPROTOCOL_EVENT));
return setFeeProtocolEventFlowable(filter);
}
public static List<SwapEventResponse> getSwapEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SWAP_EVENT, transactionReceipt);
ArrayList<SwapEventResponse> responses = new ArrayList<SwapEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SwapEventResponse getSwapEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SWAP_EVENT, log);
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
return typedResponse;
}
public Flowable<SwapEventResponse> swapEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSwapEventFromLog(log));
}
public Flowable<SwapEventResponse> swapEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SWAP_EVENT));
return swapEventFlowable(filter);
}
public RemoteFunctionCall<TransactionReceipt> burn(BigInteger tickLower, BigInteger tickUpper,
BigInteger amount) {
final Function function = new Function(
FUNC_BURN,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collect(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collectProtocol(String recipient,
BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECTPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<String> factory() {
final Function function = new Function(FUNC_FACTORY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<BigInteger> fee() {
final Function function = new Function(FUNC_FEE,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal0X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL0X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal1X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL1X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> flash(String recipient, BigInteger amount0,
BigInteger amount1, byte[] data) {
final Function function = new Function(
FUNC_FLASH,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint256(amount0),
new org.web3j.abi.datatypes.generated.Uint256(amount1),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> increaseObservationCardinalityNext(
BigInteger observationCardinalityNext) {
final Function function = new Function(
FUNC_INCREASEOBSERVATIONCARDINALITYNEXT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint16(observationCardinalityNext)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> initialize(BigInteger sqrtPriceX96) {
final Function function = new Function(
FUNC_INITIALIZE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceX96)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> liquidity() {
final Function function = new Function(FUNC_LIQUIDITY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> maxLiquidityPerTick() {
final Function function = new Function(FUNC_MAXLIQUIDITYPERTICK,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> mint(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount, byte[] data) {
final Function function = new Function(
FUNC_MINT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>> observations(
BigInteger param0) {
final Function function = new Function(FUNC_OBSERVATIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint256(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint32>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple4<BigInteger, BigInteger, BigInteger, Boolean> call() throws
Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple4<BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(Boolean) results.get(3).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>> observe(
List<BigInteger> secondsAgos) {
final Function function = new Function(FUNC_OBSERVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.DynamicArray<org.web3j.abi.datatypes.generated.Uint32>(
org.web3j.abi.datatypes.generated.Uint32.class,
org.web3j.abi.Utils.typeMap(secondsAgos, org.web3j.abi.datatypes.generated.Uint32.class))),
Arrays.<TypeReference<?>>asList(new TypeReference<DynamicArray<Int56>>() {}, new TypeReference<DynamicArray<Uint160>>() {}));
return new RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>>(function,
new Callable<Tuple2<List<BigInteger>, List<BigInteger>>>() {
@Override
public Tuple2<List<BigInteger>, List<BigInteger>> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<List<BigInteger>, List<BigInteger>>(
convertToNative((List<Int56>) results.get(0).getValue()),
convertToNative((List<Uint160>) results.get(1).getValue()));
}
});
}
public RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>> positions(
byte[] param0) {
final Function function = new Function(FUNC_POSITIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Bytes32(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger> call()
throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<BigInteger, BigInteger>> protocolFees() {
final Function function = new Function(FUNC_PROTOCOLFEES,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple2<BigInteger, BigInteger>>(function,
new Callable<Tuple2<BigInteger, BigInteger>>() {
@Override
public Tuple2<BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> setFeeProtocol(BigInteger feeProtocol0,
BigInteger feeProtocol1) {
final Function function = new Function(
FUNC_SETFEEPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint8(feeProtocol0),
new org.web3j.abi.datatypes.generated.Uint8(feeProtocol1)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> slot0(
) {
final Function function = new Function(FUNC_SLOT0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint8>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(Boolean) results.get(6).getValue());
}
});
}
public RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>> snapshotCumulativesInside(
BigInteger tickLower, BigInteger tickUpper) {
final Function function = new Function(FUNC_SNAPSHOTCUMULATIVESINSIDE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper)),
Arrays.<TypeReference<?>>asList(new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}));
return new RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple3<BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple3<BigInteger, BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple3<BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> swap(String recipient, Boolean zeroForOne,
BigInteger amountSpecified, BigInteger sqrtPriceLimitX96, byte[] data) {
final Function function = new Function(
FUNC_SWAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.Bool(zeroForOne),
new org.web3j.abi.datatypes.generated.Int256(amountSpecified),
new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceLimitX96),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> tickBitmap(BigInteger param0) {
final Function function = new Function(FUNC_TICKBITMAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int16(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> tickSpacing() {
final Function function = new Function(FUNC_TICKSPACING,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Int24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> ticks(
BigInteger param0) {
final Function function = new Function(FUNC_TICKS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Int128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(BigInteger) results.get(6).getValue(),
(Boolean) results.get(7).getValue());
}
});
}
public RemoteFunctionCall<String> token0() {
final Function function = new Function(FUNC_TOKEN0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<String> token1() {
final Function function = new Function(FUNC_TOKEN1,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, credentials, contractGasProvider);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class BurnEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectProtocolEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class FlashEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger paid0;
public BigInteger paid1;
}
public static class IncreaseObservationCardinalityNextEventResponse extends BaseEventResponse {
public BigInteger observationCardinalityNextOld;
public BigInteger observationCardinalityNextNew;
}
public static class InitializeEventResponse extends BaseEventResponse {
public BigInteger sqrtPriceX96;
public BigInteger tick;
}
public static class MintEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String sender;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class SetFeeProtocolEventResponse extends BaseEventResponse {
public BigInteger feeProtocol0Old;
public BigInteger feeProtocol1Old;
public BigInteger feeProtocol0New;
public BigInteger feeProtocol1New;
}
public static class SwapEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger sqrtPriceX96;
public BigInteger liquidity;
public BigInteger tick;
}
}

View File

@@ -2,4 +2,11 @@ package stream.dto;
public class AddressId extends ChainId {
public String address;
public AddressId() {}
public AddressId(int chainId, String address) {
super(chainId);
this.address = address;
}
}

View File

@@ -12,4 +12,9 @@ public class BlockHash extends BlockId {
public Object getId() {
return this.hash;
}
@Override
public String toString() {
return this.hash;
}
}

View File

@@ -12,4 +12,9 @@ public class BlockNumber extends BlockId {
public Object getId() {
return this.number;
}
@Override
public String toString() {
return String.valueOf(this.number);
}
}

View File

@@ -4,4 +4,10 @@ import java.io.Serializable;
public class ChainId implements Serializable {
public int chainId;
public ChainId() {}
public ChainId(int chainId) {
this.chainId = chainId;
}
}

View File

@@ -7,6 +7,8 @@ import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serial;
import java.io.Serializable;
@@ -20,6 +22,8 @@ import static stream.io.EthUtils.keccak;
@SuppressWarnings("rawtypes")
public class ElaboratedEventType<T extends EventLog> extends EventType {
private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class);
public transient String name;
public transient List<String> paramNames;
@@ -63,7 +67,9 @@ public class ElaboratedEventType<T extends EventLog> extends EventType {
int dataValuesIndex = 0;
int topicsIndex = 1; // the first topic is the event signature
List<Object> args = new ArrayList<>();
for (TypeReference<Type> paramType : this.paramTypes) {
for (int i = 0; i < this.paramTypes.size(); i++) {
TypeReference<Type> paramType = this.paramTypes.get(i);
String paramName = this.paramNames.get(i);
Object value;
if (paramType.isIndexed()) {
String encoded = topics.get(topicsIndex++);

View File

@@ -1,38 +0,0 @@
package stream.dto;
import java.io.Serializable;
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
// Token metadata for token0
public Token token0Metadata;
// Token metadata for token1
public Token token1Metadata;
public EnrichedSwapEventLog() {
super();
}
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
// Copy all base swap event fields
enriched.address = swapEvent.address;
enriched.sender = swapEvent.sender;
enriched.recipient = swapEvent.recipient;
enriched.amount0 = swapEvent.amount0;
enriched.amount1 = swapEvent.amount1;
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
enriched.liquidity = swapEvent.liquidity;
enriched.tick = swapEvent.tick;
enriched.token0 = swapEvent.token0;
enriched.token1 = swapEvent.token1;
// Copy EventLog fields
enriched.blockNumber = swapEvent.blockNumber;
enriched.transactionHash = swapEvent.transactionHash;
return enriched;
}
}

View File

@@ -1,6 +1,6 @@
package stream.dto;
enum Exchange {
public enum Exchange {
UNISWAP_V2,
UNISWAP_V3,
UNISWAP_V4,

View File

@@ -0,0 +1,56 @@
package stream.dto;
import java.math.BigDecimal;
public class OHLCCandle {
private final String pool;
private final String token0;
private final String token1;
private final long windowStart;
private final long windowEnd;
private final BigDecimal open;
private final BigDecimal high;
private final BigDecimal low;
private final BigDecimal close;
private final BigDecimal volume;
private final int tradeCount;
public OHLCCandle(String pool, String token0, String token1,
long windowStart, long windowEnd,
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
BigDecimal volume, int tradeCount) {
this.pool = pool;
this.token0 = token0;
this.token1 = token1;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.open = open;
this.high = high;
this.low = low;
this.close = close;
this.volume = volume;
this.tradeCount = tradeCount;
}
// Getters
public String getPool() { return pool; }
public String getToken0() { return token0; }
public String getToken1() { return token1; }
public long getWindowStart() { return windowStart; }
public long getWindowEnd() { return windowEnd; }
public BigDecimal getOpen() { return open; }
public BigDecimal getHigh() { return high; }
public BigDecimal getLow() { return low; }
public BigDecimal getClose() { return close; }
public BigDecimal getVolume() { return volume; }
public int getTradeCount() { return tradeCount; }
@Override
public String toString() {
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
pool.substring(0, 8) + "...",
token0.substring(0, 6), token1.substring(0, 6),
windowStart, windowEnd,
open, high, low, close, volume, tradeCount);
}
}

View File

@@ -8,11 +8,35 @@ public class Swap extends ChainId {
@Serial
private static final long serialVersionUID = 1L;
Long time; // milliseconds
Exchange exchange;
String pool; // address
String takerAsset; // token address
String makerAsset; // token address
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
BigDecimal price;
public Long time; // milliseconds
public Exchange exchange;
public String pool; // address
public String takerAsset; // token address
public String makerAsset; // token address
public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold.
public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold.
public BigDecimal price;
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) {
super(chainId);
this.time = time;
this.exchange = exchange;
this.pool = pool;
this.takerAsset = takerAsset;
this.makerAsset = makerAsset;
this.amountIn = amountIn;
this.amountOut = amountOut;
this.price = price; // Total trade value
}
// Getter methods
public Long getTime() { return time; }
public Exchange getExchange() { return exchange; }
public String getPool() { return pool; }
public String getTakerAsset() { return takerAsset; }
public String getMakerAsset() { return makerAsset; }
public BigDecimal getAmountIn() { return amountIn; }
public BigDecimal getAmountOut() { return amountOut; }
public BigDecimal getPrice() { return price; }
}

View File

@@ -29,13 +29,13 @@ public class SwapEventLog extends EventLog implements Serializable {
public String sender;
public String recipient;
@BigInt
public BigInteger amount0;
@BigInt
public BigInteger amount1;
@BigInt
public BigInteger sqrtPriceX96;
@BigInt
public BigInteger liquidity;
public int tick;
public String getAddress() {
return this.address;
}
}

View File

@@ -0,0 +1,59 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapEventWithTokenIds implements Serializable {
private SwapEventLog swapEvent;
private String token0Address;
private String token1Address;
private BigInteger timestamp;
public SwapEventWithTokenIds() {
}
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
}
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventLog swapEvent) {
this.swapEvent = swapEvent;
}
public String getToken0Address() {
return token0Address;
}
public void setToken0Address(String token0Address) {
this.token0Address = token0Address;
}
public String getToken1Address() {
return token1Address;
}
public void setToken1Address(String token1Address) {
this.token1Address = token1Address;
}
public BigInteger getTimestamp() {
return timestamp;
}
public void setTimestamp(BigInteger timestamp) {
this.timestamp = timestamp;
}
}

View File

@@ -0,0 +1,42 @@
package stream.dto;
import java.io.Serializable;
public class SwapEventWithTokenMetadata implements Serializable {
private SwapEventWithTokenIds swapEvent;
private Token token0;
private Token token1;
public SwapEventWithTokenMetadata() {
}
public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) {
this.swapEvent = swapEvent;
this.token0 = token0;
this.token1 = token1;
}
public SwapEventWithTokenIds getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventWithTokenIds swapEvent) {
this.swapEvent = swapEvent;
}
public Token getToken0() {
return token0;
}
public void setToken0(Token token0) {
this.token0 = token0;
}
public Token getToken1() {
return token1;
}
public void setToken1(Token token1) {
this.token1 = token1;
}
}

View File

@@ -0,0 +1,24 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapWithTimestamp implements Serializable {
private static final long serialVersionUID = 1L;
private final SwapEventLog swapEvent;
private final BigInteger timestamp;
public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public BigInteger getTimestamp() {
return timestamp;
}
}

View File

@@ -6,31 +6,22 @@ public class Token extends AddressId {
@Serial
private static final long serialVersionUID = 1L;
private String name;
private String symbol;
private int decimals;
public String getName() {
return name;
}
public void setName(String name) {
public String name;
public String symbol;
public int decimals;
@SuppressWarnings("unused")
public Token() {}
public Token(int chainId, String address, String name, String symbol, int decimals) {
super(chainId, address);
this.name = name;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public int getDecimals() {
return decimals;
}
public void setDecimals(int decimals) {
this.decimals = decimals;
}
@Override
public String toString() {
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
}
}

View File

@@ -0,0 +1,69 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.BlockId;
import stream.dto.BlockNumber;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
private transient Web3j w3;
private static final AtomicInteger activeOperations = new AtomicInteger(0);
private static final AtomicInteger totalOperations = new AtomicInteger(0);
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
}
@Override
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
int active = activeOperations.incrementAndGet();
int total = totalOperations.incrementAndGet();
if (total % 100 == 0) {
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
}
CompletableFuture.supplyAsync(() -> {
try {
return getBlock(blockId);
} catch (Exception e) {
log.error("Failed to get block {} on chain {}", blockId, e);
throw new RuntimeException("Error processing block " + blockId, e);
}
}).thenAccept(result -> {
activeOperations.decrementAndGet();
resultFuture.complete(Collections.singleton(result));
}).exceptionally(throwable -> {
activeOperations.decrementAndGet();
resultFuture.completeExceptionally(throwable);
return null;
});
}
private EthBlock getBlock(BlockId id) {
try {
return (id instanceof BlockNumber) ?
w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() :
w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,80 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.SwapEventLog;
import stream.dto.SwapWithTimestamp;
import java.math.BigInteger;
import java.util.Collections;
public class BlockTimestampElaborator extends RichAsyncFunction<SwapEventLog, SwapWithTimestamp> {
private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class);
private transient BlockElaborator blockElaborator;
@Override
public void open(OpenContext openContext) throws Exception {
blockElaborator = new BlockElaborator();
blockElaborator.setRuntimeContext(getRuntimeContext());
blockElaborator.open(openContext);
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<SwapWithTimestamp> resultFuture) {
try {
BlockHash blockHash = new BlockHash();
blockHash.hash = swapEvent.blockHash;
// Create a custom ResultFuture to handle the block elaboration result
ResultFuture<EthBlock> blockResultFuture = new ResultFuture<EthBlock>() {
@Override
public void complete(java.util.Collection<EthBlock> result) {
try {
if (!result.isEmpty()) {
EthBlock ethBlock = result.iterator().next();
BigInteger timestamp = ethBlock.getBlock().getTimestamp();
SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp);
resultFuture.complete(Collections.singleton(swapWithTimestamp));
} else {
log.error("No block found for block hash {}", swapEvent.blockHash);
resultFuture.completeExceptionally(new RuntimeException("No block found"));
}
} catch (Exception e) {
log.error("Failed to get timestamp for block {} of swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, e);
resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e));
}
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<EthBlock> collectionSupplier) {
try {
complete(collectionSupplier.get());
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
log.error("Failed to get block {} for swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, error);
resultFuture.completeExceptionally(error);
}
};
// Invoke the block elaborator asynchronously
blockElaborator.asyncInvoke(blockHash, blockResultFuture);
} catch (Exception e) {
log.error("Error in BlockTimestampElaborator for swap in tx {}",
swapEvent.transactionHash, e);
resultFuture.completeExceptionally(e);
}
}
}

View File

@@ -1,7 +1,9 @@
package stream.io;
import java.util.HexFormat;
import org.bouncycastle.jcajce.provider.digest.Keccak;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class EthUtils {
public static boolean isValidAddress(String address) {
@@ -35,6 +37,6 @@ public class EthUtils {
public static String keccak(String message) {
byte[] hash = new Keccak.Digest256().digest(message.getBytes());
return "0x"+ByteUtils.toHexString(hash);
return "0x" + HexFormat.of().formatHex(hash);
}
}

View File

@@ -0,0 +1,34 @@
package stream.io;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.JedisPooled;
import java.time.Duration;
import java.util.Map;
public class JedisClient {
protected static volatile JedisPooled jedis = null;
private static final Object jedisLock = new Object();
static public JedisPooled get(Map<String, String> params) {
if (jedis == null) {
synchronized (jedisLock) {
String url = params.getOrDefault("redis_url", "http://localhost:6379");
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(connections);
poolConfig.setMaxIdle(connections);
poolConfig.setMinIdle(0);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWait(Duration.ofSeconds(5));
// Enables sending a PING command periodically while the connection is idle.
poolConfig.setTestWhileIdle(true);
// controls the period between checks for idle connections in the pool
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
jedis = new JedisPooled(poolConfig, url);
}
}
return jedis;
}
}

View File

@@ -1,52 +0,0 @@
package stream.io;
public class PoolTokenFetcher {
// Uniswap V3 pool method signatures
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
private final JsonRpcClient jsonRpcClient;
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
this.jsonRpcClient = jsonRpcClient;
}
public String fetchToken0(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
public String fetchToken1(String poolAddress) throws Throwable {
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
Object[] params = new Object[]{request, "latest"};
String result = jsonRpcClient.invoke("eth_call", params, String.class);
return decodeAddress(result);
}
private String decodeAddress(String hexResult) {
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
return null;
}
// Address is the last 20 bytes (40 hex chars) of the result
// Remove 0x prefix and get last 40 characters, then add 0x back
String hex = hexResult.substring(2);
if (hex.length() >= 40) {
return "0x" + hex.substring(hex.length() - 40);
}
return hexResult; // Return as-is if unexpected format
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -0,0 +1,81 @@
package stream.io;
import stream.dto.SwapWithTimestamp;
import stream.dto.SwapEventWithTokenIds;
import stream.contract.UniswapV3Pool;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.web3j.crypto.Credentials;
import org.web3j.tx.gas.DefaultGasProvider;
import java.util.concurrent.CompletableFuture;
import java.util.Collections;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class PoolTokenIdElaborator extends RichAsyncFunction<SwapWithTimestamp, SwapEventWithTokenIds> {
private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class);
private transient Web3j web3j;
private transient Credentials credentials;
private transient DefaultGasProvider gasProvider;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
// Get RPC URL from job parameters
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
// TODO: Get from configuration if needed
// Initialize Web3j
this.web3j = Web3j.build(new HttpService(rpcUrl));
// Dummy credentials for read-only operations
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
// Default gas provider
this.gasProvider = new DefaultGasProvider();
}
@Override
public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture<SwapEventWithTokenIds> resultFuture) throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
// Load the pool contract
UniswapV3Pool pool = UniswapV3Pool.load(
swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event
web3j,
credentials,
gasProvider
);
// Get token addresses
String token0 = pool.token0().send();
String token1 = pool.token1().send();
// Create enriched event with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp());
} catch (Exception e) {
log.error("Error fetching pool tokens for swap in pool {}",
swapWithTimestamp.getSwapEvent().getAddress(), e);
// Return original without enrichment but with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp());
}
}).thenAccept(enriched -> {
resultFuture.complete(Collections.singletonList(enriched));
});
}
@Override
public void close() throws Exception {
if (web3j != null) {
web3j.shutdown();
}
}
}

View File

@@ -0,0 +1,89 @@
package stream.io;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.math.RoundingMode;
public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata, Swap> {
private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class);
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
@Override
public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception {
SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent();
SwapEventLog swapLog = event.getSwapEvent();
Token token0 = swapWithMetadata.getToken0();
Token token1 = swapWithMetadata.getToken1();
// Use timestamp from block elaboration and set exchange as UNISWAP_V3
Long time = event.getTimestamp().longValue();
Exchange exchange = Exchange.UNISWAP_V3;
// Pool address
String pool = swapLog.address;
// Get sqrtPriceX96 and calculate actual price
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
// Adjust price for decimals (price is token1/token0)
int decimalDiff = token0.decimals - token1.decimals;
BigDecimal adjustedPrice;
if (decimalDiff >= 0) {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
} else {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
}
// Determine which token is in and which is out based on amount signs
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
String takerAsset;
String makerAsset;
BigDecimal amountIn;
BigDecimal amountOut;
BigDecimal finalPrice;
if (isToken0In) {
// User is sending token0, receiving token1
takerAsset = event.getToken0Address();
makerAsset = event.getToken1Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount0.abs())
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount1)
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
// Price is how much token1 you get per token0
finalPrice = adjustedPrice;
} else {
// User is sending token1, receiving token0
takerAsset = event.getToken1Address();
makerAsset = event.getToken0Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount1.abs())
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount0)
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
// Currently adjustedPrice = token1/token0, so we keep it as is
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
finalPrice = adjustedPrice;
}
// Pass both amountIn and amountOut to constructor with unit price
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
}
}

View File

@@ -1,161 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.EnrichedSwapEventLog;
import stream.dto.SwapEventLog;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
private transient JsonRpcClient jsonRpcClient;
private transient PoolTokenFetcher poolTokenFetcher;
private transient TokenMetadataFetcher tokenMetadataFetcher;
private transient TokenCache tokenCache;
private final String rpcUrl;
public SwapEventEnricher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
this.tokenMetadataFetcher.open(openContext);
this.tokenCache = new TokenCache();
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
try {
// First, get token addresses if not already populated
CompletableFuture<Void> tokenAddressFuture;
if (swapEvent.token0 == null || swapEvent.token1 == null) {
tokenAddressFuture = CompletableFuture.runAsync(() -> {
try {
if (swapEvent.token0 == null) {
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
}
if (swapEvent.token1 == null) {
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
}
} catch (Throwable e) {
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
}
});
} else {
tokenAddressFuture = CompletableFuture.completedFuture(null);
}
// Then fetch metadata for both tokens
tokenAddressFuture.thenCompose(v -> {
if (swapEvent.token0 == null || swapEvent.token1 == null) {
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
}
// Fetch metadata for both tokens concurrently (with caching)
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
.thenApply(ignored -> {
try {
Token token0Meta = token0MetaFuture.get();
Token token1Meta = token1MetaFuture.get();
// Create enriched event with token metadata
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
// Set token metadata objects
if (token0Meta != null) {
enriched.token0Metadata = token0Meta;
// Also set the address in the Token object
token0Meta.address = swapEvent.token0;
}
if (token1Meta != null) {
enriched.token1Metadata = token1Meta;
// Also set the address in the Token object
token1Meta.address = swapEvent.token1;
}
return enriched;
} catch (Exception e) {
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
}
});
})
.thenAccept(enrichedEvent -> {
resultFuture.complete(Collections.singleton(enrichedEvent));
})
.exceptionally(throwable -> {
System.err.println("Error enriching swap event: " + throwable.getMessage());
throwable.printStackTrace();
resultFuture.completeExceptionally(throwable);
return null;
});
// Log cache stats periodically (every 100 events)
if (System.currentTimeMillis() % 10000 < 100) {
System.out.println("Token cache stats: " + tokenCache.getStats());
}
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private static class MockResultFuture implements ResultFuture<Token> {
private Token result;
private Throwable error;
private boolean complete = false;
@Override
public void complete(java.util.Collection<Token> results) {
if (!results.isEmpty()) {
this.result = results.iterator().next();
}
this.complete = true;
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
this.error = error;
this.complete = true;
}
public boolean isComplete() {
return complete;
}
public boolean hasError() {
return error != null;
}
public Token getResult() {
return result;
}
public Throwable getError() {
return error;
}
}
}

View File

@@ -1,142 +0,0 @@
package stream.io;
import stream.dto.Token;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
/**
* Thread-safe cache for token metadata to avoid repeated RPC calls
* for the same token addresses.
*/
public class TokenCache {
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
/**
* Get token from cache if present, otherwise return null
*/
public Token get(String tokenAddress) {
return cache.get(tokenAddress.toLowerCase());
}
/**
* Put token in cache
*/
public void put(String tokenAddress, Token token) {
if (token != null && tokenAddress != null) {
cache.put(tokenAddress.toLowerCase(), token);
}
}
/**
* Check if token is already in cache
*/
public boolean contains(String tokenAddress) {
return cache.containsKey(tokenAddress.toLowerCase());
}
/**
* Get or fetch token metadata with deduplication.
* If multiple requests come for the same token simultaneously,
* only one will fetch and others will wait for the result.
*/
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
String normalizedAddress = tokenAddress.toLowerCase();
// Check cache first
Token cachedToken = cache.get(normalizedAddress);
if (cachedToken != null) {
return CompletableFuture.completedFuture(cachedToken);
}
// Check if there's already a pending request for this token
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
if (pendingFuture != null) {
return pendingFuture;
}
// Create new request
CompletableFuture<Token> future = new CompletableFuture<>();
// Register the pending request
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
if (existingFuture != null) {
// Another thread beat us to it, return their future
return existingFuture;
}
// We're the first, so fetch the token metadata
try {
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
@Override
public void complete(java.util.Collection<Token> results) {
Token token = results.isEmpty() ? null : results.iterator().next();
// Cache the result (even if null to avoid repeated failures)
if (token != null) {
cache.put(normalizedAddress, token);
}
// Complete the future
future.complete(token);
// Remove from pending requests
pendingRequests.remove(normalizedAddress);
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
try {
java.util.Collection<Token> resultCollection = results.get();
complete(resultCollection);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
future.completeExceptionally(error);
pendingRequests.remove(normalizedAddress);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
pendingRequests.remove(normalizedAddress);
}
return future;
}
/**
* Get cache statistics for monitoring
*/
public CacheStats getStats() {
return new CacheStats(cache.size(), pendingRequests.size());
}
/**
* Clear all cached entries (useful for testing or memory management)
*/
public void clear() {
cache.clear();
// Note: We don't clear pending requests as they're in flight
}
public static class CacheStats {
public final int cachedTokens;
public final int pendingRequests;
public CacheStats(int cachedTokens, int pendingRequests) {
this.cachedTokens = cachedTokens;
this.pendingRequests = pendingRequests;
}
@Override
public String toString() {
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
}
}
}

View File

@@ -0,0 +1,66 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.tx.ReadonlyTransactionManager;
import org.web3j.tx.gas.DefaultGasProvider;
import stream.contract.ERC20;
import stream.dto.AddressId;
import stream.dto.Token;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
private transient Web3j w3;
private DefaultGasProvider gasProvider;
private ReadonlyTransactionManager transactionManager;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
gasProvider = new DefaultGasProvider();
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
}
@Override
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
CompletableFuture.supplyAsync(() -> {
try {
return processToken(address);
} catch (Exception e) {
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
throw new RuntimeException("Error processing token: " + address, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private Token processToken(AddressId address) {
String name;
String symbol;
int decimals;
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
try {
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
CompletableFuture<String> nameAsync = contract.name().sendAsync();
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
decimals = decimalsAsync.get().intValue();
name = nameAsync.get();
symbol = symbolAsync.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return new Token(address.chainId, address.address, name, symbol, decimals);
}
}

View File

@@ -1,166 +0,0 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.Token;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
// ERC-20 method signatures
private static final String NAME_SIGNATURE = "0x06fdde03";
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
private static final String DECIMALS_SIGNATURE = "0x313ce567";
private static final String LATEST_BLOCK = "latest";
private transient JsonRpcClient jsonRpcClient;
private final String rpcUrl;
public TokenMetadataFetcher(String rpcUrl) {
this.rpcUrl = rpcUrl;
}
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
}
@Override
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
try {
// Fetch all metadata concurrently
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchName(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchSymbol(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
try {
return fetchDecimals(tokenAddress);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
// Combine all results
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
.thenAccept(v -> {
try {
Token token = new Token();
token.setName(decodeString(nameFuture.get()));
token.setSymbol(decodeString(symbolFuture.get()));
token.setDecimals(decodeUint8(decimalsFuture.get()));
resultFuture.complete(Collections.singleton(token));
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
})
.exceptionally(throwable -> {
resultFuture.completeExceptionally(throwable);
return null;
});
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
}
private String fetchName(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, NAME_SIGNATURE);
}
private String fetchSymbol(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
}
private String fetchDecimals(String tokenAddress) throws Throwable {
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
}
private String makeEthCall(String toAddress, String data) throws Throwable {
EthCallRequest request = new EthCallRequest(toAddress, data);
Object[] params = new Object[]{request, LATEST_BLOCK};
return jsonRpcClient.invoke("eth_call", params, String.class);
}
private String decodeString(String hexData) {
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
return "Unknown";
}
try {
// Remove 0x prefix
String hex = hexData.substring(2);
// Skip the first 32 bytes (offset pointer)
// The actual string length is at bytes 32-64
String lengthHex = hex.substring(64, 128);
int length = Integer.parseInt(lengthHex, 16);
if (length <= 0 || length > 100) { // Sanity check
return "Invalid";
}
// Extract the actual string data starting at byte 64
String dataHex = hex.substring(128, 128 + (length * 2));
// Convert hex to string
StringBuilder result = new StringBuilder();
for (int i = 0; i < dataHex.length(); i += 2) {
String hexChar = dataHex.substring(i, i + 2);
int charCode = Integer.parseInt(hexChar, 16);
if (charCode > 0) { // Skip null bytes
result.append((char) charCode);
}
}
return result.toString();
} catch (Exception e) {
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
return "DecodeError";
}
}
private int decodeUint8(String hexData) {
if (hexData == null || hexData.equals("0x")) {
return 0;
}
try {
// Remove 0x prefix and get last byte (uint8)
String hex = hexData.substring(2);
// uint8 is the last 2 hex characters (1 byte)
String uint8Hex = hex.substring(hex.length() - 2);
return Integer.parseInt(uint8Hex, 16);
} catch (Exception e) {
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
return 0;
}
}
private static class EthCallRequest {
public final String to;
public final String data;
public EthCallRequest(String to, String data) {
this.to = to;
this.data = data;
}
}
}

View File

@@ -0,0 +1,50 @@
package stream.io;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.config.StreamingDefaults;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.MODERN_TLS;
public class Web3Client {
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
private static volatile Web3j w3 = null;
private static final Object w3Lock = new Object();
static public Web3j get(Map<String, String> params) {
if (w3==null) {
synchronized (w3Lock) {
log.info("Initializing Web3 client");
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
Duration timeout = Duration.ofSeconds(30);
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
url, maxIdleConnections, keepAlive, timeout.getSeconds());
var httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
.connectionPool(connectionPool)
.build();
w3 = Web3j.build(new HttpService(url, httpClient));
log.info("Web3 client initialized successfully");
}
}
return w3;
}
}

View File

@@ -0,0 +1,41 @@
package stream.ohlc;
import java.math.BigDecimal;
public class OHLCAccumulator {
public String pool = null;
public String token0 = null; // Added
public String token1 = null; // Added
public BigDecimal open = null;
public BigDecimal high = null;
public BigDecimal low = null;
public BigDecimal close = null;
public BigDecimal volume = BigDecimal.ZERO;
public int tradeCount = 0;
public long firstTradeTime = 0;
public long lastTradeTime = 0;
public OHLCAccumulator() {}
public void updatePrice(BigDecimal price, long timestamp) {
if (open == null) {
open = price;
high = price;
low = price;
firstTradeTime = timestamp;
} else {
high = high.max(price);
low = low.min(price);
}
close = price;
lastTradeTime = timestamp;
}
public void addVolume(BigDecimal amount) {
volume = volume.add(amount);
}
public void incrementTradeCount() {
tradeCount++;
}
}

View File

@@ -0,0 +1,98 @@
package stream.ohlc;
import org.apache.flink.api.common.functions.AggregateFunction;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
import java.math.BigDecimal;
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
private final long windowSize = 60; // 1 minute in seconds
@Override
public OHLCAccumulator createAccumulator() {
return new OHLCAccumulator();
}
@Override
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
// Initialize pool and tokens on first swap
if (accumulator.pool == null) {
accumulator.pool = swap.getPool();
// Store tokens in consistent order (you could sort by address)
accumulator.token0 = swap.getTakerAsset();
accumulator.token1 = swap.getMakerAsset();
}
// Update OHLC prices
accumulator.updatePrice(swap.getPrice(), swap.getTime());
// Calculate volume (using the taker's input amount as volume)
BigDecimal volume = swap.getAmountIn().abs();
accumulator.addVolume(volume);
// Increment trade count
accumulator.incrementTradeCount();
return accumulator;
}
@Override
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
OHLCAccumulator merged = new OHLCAccumulator();
// Merge pool and token info
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
// Merge OHLC data
if (acc1.open != null && acc2.open != null) {
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
merged.high = acc1.high.max(acc2.high);
merged.low = acc1.low.min(acc2.low);
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
} else if (acc1.open != null) {
merged.open = acc1.open;
merged.close = acc1.close;
merged.high = acc1.high;
merged.low = acc1.low;
merged.firstTradeTime = acc1.firstTradeTime;
merged.lastTradeTime = acc1.lastTradeTime;
} else if (acc2.open != null) {
merged.open = acc2.open;
merged.close = acc2.close;
merged.high = acc2.high;
merged.low = acc2.low;
merged.firstTradeTime = acc2.firstTradeTime;
merged.lastTradeTime = acc2.lastTradeTime;
}
merged.volume = acc1.volume.add(acc2.volume);
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
return merged;
}
@Override
public OHLCCandle getResult(OHLCAccumulator accumulator) {
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
long windowEnd = windowStart + windowSize;
return new OHLCCandle(
accumulator.pool,
accumulator.token0,
accumulator.token1,
windowStart,
windowEnd,
accumulator.open,
accumulator.high,
accumulator.low,
accumulator.close,
accumulator.volume,
accumulator.tradeCount
);
}
}

View File

@@ -0,0 +1,26 @@
package stream.ohlc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import java.time.Duration;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
public class OHLCPipeline {
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
return swapStream
// NO watermarks needed for processing-time windows
.keyBy(Swap::getPool)
// Use 1-minute processing-time windows
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
// Simply use the aggregator
.aggregate(new OHLCAggregator())
// Filter out empty windows
.filter(candle -> candle.getTradeCount() > 0);
}
}