Compare commits

15 Commits

Author SHA1 Message Date
255d8f126d updating stream defauls like aysnc calls, wait timeout, max idle connection and keep alive counts to ensure the stream doesn't fail. Moving everything to a config file to easy updates 2025-10-10 14:54:16 -04:00
88fce17efc OHLC pipeline stuff 2025-10-09 13:06:22 -04:00
b4f1de9d0d removing price inversion in the swap ellaborator since we always want to represent the vlaue of the swap in its quote curency 2025-10-09 13:00:07 -04:00
0d8df3df9c adding renamed elaborators 2025-10-08 13:22:50 -04:00
2fc85c688b Using timestmap from blockhash. Also renaming elaborators to be more descriptive and removing unneeded logs and code 2025-10-08 13:16:35 -04:00
47fe85b50b Swap object stream with verified amount in and amount out 2025-10-07 20:11:01 -04:00
surbhi
6b4cad1479 Remove .mvn/jvm.config file 2025-10-06 19:25:35 -04:00
04d441d6b1 Remove SSH key files from repository 2025-10-06 19:18:33 -04:00
sjhavar
00ae425700 Add token metadata logging to swap event stream
- Integrated TokenElaborator to enrich swap events with token metadata
- Added comprehensive logging showing pool, token addresses, and metadata
- Increased timeout to handle rate limiting from RPC provider
- Simplified log output removing emoji formatting
2025-10-06 16:03:53 -04:00
sjhavar
e58fdc0b06 Add PoolElaborator for token address enrichment
- Created PoolElaborator async function to retrieve token0 and token1 addresses from Uniswap v3 pools
- Added ElaboratedSwapEvent DTO to store swap events with token addresses
- Updated SwapEventLog with getAddress() method for pool contract access
- Integrated PoolElaborator into DataStreamJob pipeline for real-time token enrichment
- Configured dynamic RPC URL retrieval from job parameters
2025-10-06 16:03:53 -04:00
tim
7449b4b4b1 BlockElaborator bugfix 2025-10-06 14:41:18 -04:00
tim
6829053d94 BlockElaborator 2025-10-06 12:33:45 -04:00
tim
3a64f5630c UniswapV3Pool wrapper 2025-10-06 12:00:07 -04:00
tim
9f65c2e850 Merge remote-tracking branch 'origin/main' 2025-09-28 14:10:00 -04:00
tim
8c672f250e TokenElaborator 2025-09-28 14:09:16 -04:00
29 changed files with 2312 additions and 92 deletions

35
pom.xml
View File

@@ -35,9 +35,11 @@ under the License.
<scala.binary.version>2.12</scala.binary.version> <scala.binary.version>2.12</scala.binary.version>
<flink.version>2.1.0</flink.version> <flink.version>2.1.0</flink.version>
<log4j.version>2.24.1</log4j.version> <log4j.version>2.24.1</log4j.version>
<web3j.version>5.0.0</web3j.version> <web3j.version>5.0.1</web3j.version>
<httpclient5.version>5.3.1</httpclient5.version> <httpclient5.version>5.3.1</httpclient5.version>
<jsonrpc4j.version>1.6</jsonrpc4j.version> <jsonrpc4j.version>1.6</jsonrpc4j.version>
<jedis.version>6.1.0</jedis.version>
<jackson.version>2.17.1</jackson.version>
</properties> </properties>
<repositories> <repositories>
@@ -55,7 +57,23 @@ under the License.
</repositories> </repositories>
<dependencies> <dependencies>
<!-- Apache Flink dependencies --> <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
@@ -132,7 +150,14 @@ under the License.
<version>${log4j.version}</version> <version>${log4j.version}</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
</dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
</dependencies>
<build> <build>
<plugins> <plugins>
@@ -143,8 +168,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version> <version>3.1</version>
<configuration> <configuration>
<source>16</source> <source>${target.java.version}</source>
<target>16</target> <target>${target.java.version}</target>
</configuration> </configuration>
</plugin> </plugin>

View File

@@ -19,17 +19,32 @@
package stream; package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.ParameterTool; import org.apache.flink.util.ParameterTool;
import org.apache.flink.util.Collector;
import stream.config.StreamingDefaults;
import stream.dto.*; import stream.dto.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import stream.io.PoolTokenIdElaborator;
import stream.io.TokenElaborator;
import stream.io.SwapElaborator;
import stream.io.BlockTimestampElaborator;
import stream.ohlc.OHLCPipeline;
import stream.source.eventlog.EventLogSourceFactory; import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -57,89 +72,150 @@ public class DataStreamJob {
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority .mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// do not do this until considering how secrets are handled by flink env.getConfig().setGlobalJobParameters(parameters);
// env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546")); URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
// Async operation parameters
int asyncCapacity = parameters.getInt("async.capacity", StreamingDefaults.ASYNC_CAPACITY);
int asyncTimeoutSeconds = parameters.getInt("async.timeout.seconds", StreamingDefaults.ASYNC_TIMEOUT_SECONDS);
log.info("Async configuration - Capacity: {}, Timeout: {}s", asyncCapacity, asyncTimeoutSeconds);
// Create ObjectMapper for pretty JSON printing // Create ObjectMapper for pretty JSON printing
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"ArbitrumOne Head Blocks",
TypeInformation.of(ArbitrumOneBlock.class)
);
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events"); DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
// Map the blocks to pretty-printed JSON strings // Add block timestamp to swap events
/* SingleOutputStreamOperator<SwapWithTimestamp> swapWithTimestampStream = AsyncDataStream.unorderedWait(
blockStream swapStream,
.map(block -> { new BlockTimestampElaborator(),
try { asyncTimeoutSeconds,
return mapper.writeValueAsString(block); TimeUnit.SECONDS,
} catch (Exception e) { asyncCapacity
return "Error converting block to JSON: " + e.getMessage(); );
}
SingleOutputStreamOperator<SwapEventWithTokenIds> elaboratedSwapStream = AsyncDataStream.unorderedWait(
swapWithTimestampStream,
new PoolTokenIdElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Extract token addresses and elaborate with metadata
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
.flatMap(new FlatMapFunction<SwapEventWithTokenIds, AddressId>() {
@Override
public void flatMap(SwapEventWithTokenIds event, Collector<AddressId> collector) throws Exception {
collector.collect(new AddressId(42161, event.getToken0Address()));
collector.collect(new AddressId(42161, event.getToken1Address()));
}
});
// Elaborate tokens with metadata
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
tokenAddresses,
new TokenElaborator(),
asyncTimeoutSeconds,
TimeUnit.SECONDS,
asyncCapacity
);
// Connect swap events with token metadata to create SwapEventWithTokenMetadata
DataStream<SwapEventWithTokenMetadata> swapsWithTokens = elaboratedSwapStream
.connect(elaboratedTokens)
.flatMap(new RichCoFlatMapFunction<SwapEventWithTokenIds, Token, SwapEventWithTokenMetadata>() {
private final Map<String, Token> tokenCache = new HashMap<>();
private final Map<String, SwapEventWithTokenIds> pendingSwaps = new HashMap<>();
@Override
public void flatMap1(SwapEventWithTokenIds event, Collector<SwapEventWithTokenMetadata> out) throws Exception {
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
// We have both tokens, create SwapEventWithTokenMetadata
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
out.collect(swapWithMetadata);
} else {
// Cache the swap event until we get both tokens
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
}
}
@Override
public void flatMap2(Token token, Collector<SwapEventWithTokenMetadata> out) throws Exception {
// Cache the token
tokenCache.put(token.address.toLowerCase(), token);
// Check if any pending swaps can now be completed
Iterator<Map.Entry<String, SwapEventWithTokenIds>> iterator = pendingSwaps.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SwapEventWithTokenIds> entry = iterator.next();
SwapEventWithTokenIds event = entry.getValue();
String token0Addr = event.getToken0Address().toLowerCase();
String token1Addr = event.getToken1Address().toLowerCase();
Token token0 = tokenCache.get(token0Addr);
Token token1 = tokenCache.get(token1Addr);
if (token0 != null && token1 != null) {
SwapEventWithTokenMetadata swapWithMetadata = new SwapEventWithTokenMetadata(event, token0, token1);
out.collect(swapWithMetadata);
iterator.remove();
}
}
}
});
// Apply SwapElaborator to create final Swap objects with calculated prices
DataStream<Swap> swaps = swapsWithTokens
.map(new SwapElaborator());
// Test OHLC with USDC/WETH pool
OHLCPipeline ohlcPipeline = new OHLCPipeline();
DataStream<OHLCCandle> allOhlc = ohlcPipeline.createOHLCStream(swaps);
// Filter and print OHLC candles for USDC/WETH pool only
allOhlc
.filter(candle -> {
// Filter for specific USDC/WETH pool
String poolAddress = candle.getPool().toLowerCase();
String targetPool = "0x6f38e884725a116c9c7fbf208e79fe8828a2595f".toLowerCase();
return poolAddress.equals(targetPool);
}) })
.print("New Ethereum Block: "); .map(candle -> {
System.out.println("USDC/WETH OHLC: Pool=" + candle.getPool() +
" Window=" + candle.getWindowStart() + "-" + candle.getWindowEnd() +
" Trades=" + candle.getTradeCount() +
" Open=" + candle.getOpen() +
" High=" + candle.getHigh() +
" Low=" + candle.getLow() +
" Close=" + candle.getClose() +
" Volume=" + candle.getVolume());
return candle;
})
.setParallelism(1)
.returns(OHLCCandle.class)
.print("USDC-WETH-OHLC");
transferStream // Print the final enriched swap objects
.map(event -> { swaps
.map(swap -> {
try { try {
return mapper.writeValueAsString(event); String json = mapper.writeValueAsString(swap);
return json;
} catch (Exception e) { } catch (Exception e) {
return "Error converting transfer event to JSON: " + e.getMessage(); return "Error converting enriched swap to JSON: " + e.getMessage();
} }
}) })
.print("Transfer Event: "); .print("Enriched Swap: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: ");
*/
mintStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting mint event to JSON: " + e.getMessage();
}
})
.print("Mint Event: ");
burnStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting burn event to JSON: " + e.getMessage();
}
})
.print("Burn Event: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: ");
env.execute("Ethereum Block Stream"); env.execute("Ethereum Block Stream");
} }

View File

@@ -0,0 +1,7 @@
package stream.contract;
public class ArbitrumOne {
static public final int CHAIN_ID = 42161;
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
}

View File

@@ -0,0 +1,263 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 4.1.1.
*/
public class ERC20 extends Contract {
private static final String BINARY = "Bin file was not provided";
public static final String FUNC_NAME = "name";
public static final String FUNC_APPROVE = "approve";
public static final String FUNC_TOTALSUPPLY = "totalSupply";
public static final String FUNC_TRANSFERFROM = "transferFrom";
public static final String FUNC_DECIMALS = "decimals";
public static final String FUNC_BALANCEOF = "balanceOf";
public static final String FUNC_SYMBOL = "symbol";
public static final String FUNC_TRANSFER = "transfer";
public static final String FUNC_ALLOWANCE = "allowance";
public static final Event TRANSFER_EVENT = new Event("Transfer",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
public static final Event APPROVAL_EVENT = new Event("Approval",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
;
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public RemoteCall<String> name() {
final Function function = new Function(FUNC_NAME,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
final Function function = new Function(
FUNC_APPROVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> totalSupply() {
final Function function = new Function(FUNC_TOTALSUPPLY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFERFROM,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> decimals() {
final Function function = new Function(FUNC_DECIMALS,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<BigInteger> balanceOf(String _owner) {
final Function function = new Function(FUNC_BALANCEOF,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteCall<String> symbol() {
final Function function = new Function(FUNC_SYMBOL,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
final Function function = new Function(
FUNC_TRANSFER,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
new org.web3j.abi.datatypes.generated.Uint256(_value)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
final Function function = new Function(FUNC_ALLOWANCE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
new org.web3j.abi.datatypes.Address(_spender)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
@Override
public TransferEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = log;
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
return transferEventFlowable(filter);
}
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
responses.add(typedResponse);
}
return responses;
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
@Override
public ApprovalEventResponse apply(Log log) {
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
typedResponse.log = log;
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
return approvalEventFlowable(filter);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
}
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class TransferEventResponse {
public Log log;
public String _from;
public String _to;
public BigInteger _value;
}
public static class ApprovalEventResponse {
public Log log;
public String _owner;
public String _spender;
public BigInteger _value;
}
}

View File

@@ -0,0 +1,965 @@
package stream.contract;
import io.reactivex.Flowable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import org.web3j.abi.EventEncoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.DynamicArray;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Function;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.generated.Int128;
import org.web3j.abi.datatypes.generated.Int24;
import org.web3j.abi.datatypes.generated.Int256;
import org.web3j.abi.datatypes.generated.Int56;
import org.web3j.abi.datatypes.generated.Uint128;
import org.web3j.abi.datatypes.generated.Uint16;
import org.web3j.abi.datatypes.generated.Uint160;
import org.web3j.abi.datatypes.generated.Uint24;
import org.web3j.abi.datatypes.generated.Uint256;
import org.web3j.abi.datatypes.generated.Uint32;
import org.web3j.abi.datatypes.generated.Uint8;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.RemoteFunctionCall;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.BaseEventResponse;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.tuples.generated.Tuple2;
import org.web3j.tuples.generated.Tuple3;
import org.web3j.tuples.generated.Tuple4;
import org.web3j.tuples.generated.Tuple5;
import org.web3j.tuples.generated.Tuple7;
import org.web3j.tuples.generated.Tuple8;
import org.web3j.tx.Contract;
import org.web3j.tx.TransactionManager;
import org.web3j.tx.gas.ContractGasProvider;
/**
* <p>Auto generated code.
* <p><strong>Do not modify!</strong>
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
* <a href="https://github.com/LFDT-web3j/web3j/tree/main/codegen">codegen module</a> to update.
*
* <p>Generated with web3j version 1.7.0.
*/
@SuppressWarnings("rawtypes")
public class UniswapV3Pool extends Contract {
public static final String BINARY = "Bin file was not provided";
public static final String FUNC_BURN = "burn";
public static final String FUNC_COLLECT = "collect";
public static final String FUNC_COLLECTPROTOCOL = "collectProtocol";
public static final String FUNC_FACTORY = "factory";
public static final String FUNC_FEE = "fee";
public static final String FUNC_FEEGROWTHGLOBAL0X128 = "feeGrowthGlobal0X128";
public static final String FUNC_FEEGROWTHGLOBAL1X128 = "feeGrowthGlobal1X128";
public static final String FUNC_FLASH = "flash";
public static final String FUNC_INCREASEOBSERVATIONCARDINALITYNEXT = "increaseObservationCardinalityNext";
public static final String FUNC_INITIALIZE = "initialize";
public static final String FUNC_LIQUIDITY = "liquidity";
public static final String FUNC_MAXLIQUIDITYPERTICK = "maxLiquidityPerTick";
public static final String FUNC_MINT = "mint";
public static final String FUNC_OBSERVATIONS = "observations";
public static final String FUNC_OBSERVE = "observe";
public static final String FUNC_POSITIONS = "positions";
public static final String FUNC_PROTOCOLFEES = "protocolFees";
public static final String FUNC_SETFEEPROTOCOL = "setFeeProtocol";
public static final String FUNC_SLOT0 = "slot0";
public static final String FUNC_SNAPSHOTCUMULATIVESINSIDE = "snapshotCumulativesInside";
public static final String FUNC_SWAP = "swap";
public static final String FUNC_TICKBITMAP = "tickBitmap";
public static final String FUNC_TICKSPACING = "tickSpacing";
public static final String FUNC_TICKS = "ticks";
public static final String FUNC_TOKEN0 = "token0";
public static final String FUNC_TOKEN1 = "token1";
public static final Event BURN_EVENT = new Event("Burn",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event COLLECT_EVENT = new Event("Collect",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>() {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event COLLECTPROTOCOL_EVENT = new Event("CollectProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
;
public static final Event FLASH_EVENT = new Event("Flash",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event INCREASEOBSERVATIONCARDINALITYNEXT_EVENT = new Event("IncreaseObservationCardinalityNext",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}));
;
public static final Event INITIALIZE_EVENT = new Event("Initialize",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}));
;
public static final Event MINT_EVENT = new Event("Mint",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}, new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
;
public static final Event SETFEEPROTOCOL_EVENT = new Event("SetFeeProtocol",
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}));
;
public static final Event SWAP_EVENT = new Event("Swap",
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Int256>() {}, new TypeReference<Int256>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint128>() {}, new TypeReference<Int24>() {}));
;
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice,
BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
}
@Deprecated
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
BigInteger gasPrice, BigInteger gasLimit) {
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
ContractGasProvider contractGasProvider) {
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
}
public static List<BurnEventResponse> getBurnEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(BURN_EVENT, transactionReceipt);
ArrayList<BurnEventResponse> responses = new ArrayList<BurnEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static BurnEventResponse getBurnEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(BURN_EVENT, log);
BurnEventResponse typedResponse = new BurnEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<BurnEventResponse> burnEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getBurnEventFromLog(log));
}
public Flowable<BurnEventResponse> burnEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(BURN_EVENT));
return burnEventFlowable(filter);
}
public static List<CollectEventResponse> getCollectEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECT_EVENT, transactionReceipt);
ArrayList<CollectEventResponse> responses = new ArrayList<CollectEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectEventResponse getCollectEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECT_EVENT, log);
CollectEventResponse typedResponse = new CollectEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
return typedResponse;
}
public Flowable<CollectEventResponse> collectEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectEventFromLog(log));
}
public Flowable<CollectEventResponse> collectEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECT_EVENT));
return collectEventFlowable(filter);
}
public static List<CollectProtocolEventResponse> getCollectProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, transactionReceipt);
ArrayList<CollectProtocolEventResponse> responses = new ArrayList<CollectProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static CollectProtocolEventResponse getCollectProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, log);
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getCollectProtocolEventFromLog(log));
}
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(COLLECTPROTOCOL_EVENT));
return collectProtocolEventFlowable(filter);
}
public static List<FlashEventResponse> getFlashEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(FLASH_EVENT, transactionReceipt);
ArrayList<FlashEventResponse> responses = new ArrayList<FlashEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static FlashEventResponse getFlashEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(FLASH_EVENT, log);
FlashEventResponse typedResponse = new FlashEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<FlashEventResponse> flashEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getFlashEventFromLog(log));
}
public Flowable<FlashEventResponse> flashEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(FLASH_EVENT));
return flashEventFlowable(filter);
}
public static List<IncreaseObservationCardinalityNextEventResponse> getIncreaseObservationCardinalityNextEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, transactionReceipt);
ArrayList<IncreaseObservationCardinalityNextEventResponse> responses = new ArrayList<IncreaseObservationCardinalityNextEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static IncreaseObservationCardinalityNextEventResponse getIncreaseObservationCardinalityNextEventFromLog(
Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, log);
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
typedResponse.log = log;
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getIncreaseObservationCardinalityNextEventFromLog(log));
}
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT));
return increaseObservationCardinalityNextEventFlowable(filter);
}
public static List<InitializeEventResponse> getInitializeEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INITIALIZE_EVENT, transactionReceipt);
ArrayList<InitializeEventResponse> responses = new ArrayList<InitializeEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
responses.add(typedResponse);
}
return responses;
}
public static InitializeEventResponse getInitializeEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INITIALIZE_EVENT, log);
InitializeEventResponse typedResponse = new InitializeEventResponse();
typedResponse.log = log;
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
return typedResponse;
}
public Flowable<InitializeEventResponse> initializeEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getInitializeEventFromLog(log));
}
public Flowable<InitializeEventResponse> initializeEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(INITIALIZE_EVENT));
return initializeEventFlowable(filter);
}
public static List<MintEventResponse> getMintEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(MINT_EVENT, transactionReceipt);
ArrayList<MintEventResponse> responses = new ArrayList<MintEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static MintEventResponse getMintEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(MINT_EVENT, log);
MintEventResponse typedResponse = new MintEventResponse();
typedResponse.log = log;
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<MintEventResponse> mintEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getMintEventFromLog(log));
}
public Flowable<MintEventResponse> mintEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(MINT_EVENT));
return mintEventFlowable(filter);
}
public static List<SetFeeProtocolEventResponse> getSetFeeProtocolEvents(
TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, transactionReceipt);
ArrayList<SetFeeProtocolEventResponse> responses = new ArrayList<SetFeeProtocolEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SetFeeProtocolEventResponse getSetFeeProtocolEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, log);
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
typedResponse.log = log;
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
return typedResponse;
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSetFeeProtocolEventFromLog(log));
}
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SETFEEPROTOCOL_EVENT));
return setFeeProtocolEventFlowable(filter);
}
public static List<SwapEventResponse> getSwapEvents(TransactionReceipt transactionReceipt) {
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SWAP_EVENT, transactionReceipt);
ArrayList<SwapEventResponse> responses = new ArrayList<SwapEventResponse>(valueList.size());
for (Contract.EventValuesWithLog eventValues : valueList) {
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = eventValues.getLog();
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
responses.add(typedResponse);
}
return responses;
}
public static SwapEventResponse getSwapEventFromLog(Log log) {
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SWAP_EVENT, log);
SwapEventResponse typedResponse = new SwapEventResponse();
typedResponse.log = log;
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
return typedResponse;
}
public Flowable<SwapEventResponse> swapEventFlowable(EthFilter filter) {
return web3j.ethLogFlowable(filter).map(log -> getSwapEventFromLog(log));
}
public Flowable<SwapEventResponse> swapEventFlowable(DefaultBlockParameter startBlock,
DefaultBlockParameter endBlock) {
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(SWAP_EVENT));
return swapEventFlowable(filter);
}
public RemoteFunctionCall<TransactionReceipt> burn(BigInteger tickLower, BigInteger tickUpper,
BigInteger amount) {
final Function function = new Function(
FUNC_BURN,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collect(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> collectProtocol(String recipient,
BigInteger amount0Requested, BigInteger amount1Requested) {
final Function function = new Function(
FUNC_COLLECTPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<String> factory() {
final Function function = new Function(FUNC_FACTORY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<BigInteger> fee() {
final Function function = new Function(FUNC_FEE,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal0X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL0X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> feeGrowthGlobal1X128() {
final Function function = new Function(FUNC_FEEGROWTHGLOBAL1X128,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> flash(String recipient, BigInteger amount0,
BigInteger amount1, byte[] data) {
final Function function = new Function(
FUNC_FLASH,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Uint256(amount0),
new org.web3j.abi.datatypes.generated.Uint256(amount1),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> increaseObservationCardinalityNext(
BigInteger observationCardinalityNext) {
final Function function = new Function(
FUNC_INCREASEOBSERVATIONCARDINALITYNEXT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint16(observationCardinalityNext)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<TransactionReceipt> initialize(BigInteger sqrtPriceX96) {
final Function function = new Function(
FUNC_INITIALIZE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceX96)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> liquidity() {
final Function function = new Function(FUNC_LIQUIDITY,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> maxLiquidityPerTick() {
final Function function = new Function(FUNC_MAXLIQUIDITYPERTICK,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<TransactionReceipt> mint(String recipient, BigInteger tickLower,
BigInteger tickUpper, BigInteger amount, byte[] data) {
final Function function = new Function(
FUNC_MINT,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
new org.web3j.abi.datatypes.generated.Uint128(amount),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>> observations(
BigInteger param0) {
final Function function = new Function(FUNC_OBSERVATIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint256(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint32>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple4<BigInteger, BigInteger, BigInteger, Boolean> call() throws
Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple4<BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(Boolean) results.get(3).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>> observe(
List<BigInteger> secondsAgos) {
final Function function = new Function(FUNC_OBSERVE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.DynamicArray<org.web3j.abi.datatypes.generated.Uint32>(
org.web3j.abi.datatypes.generated.Uint32.class,
org.web3j.abi.Utils.typeMap(secondsAgos, org.web3j.abi.datatypes.generated.Uint32.class))),
Arrays.<TypeReference<?>>asList(new TypeReference<DynamicArray<Int56>>() {}, new TypeReference<DynamicArray<Uint160>>() {}));
return new RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>>(function,
new Callable<Tuple2<List<BigInteger>, List<BigInteger>>>() {
@Override
public Tuple2<List<BigInteger>, List<BigInteger>> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<List<BigInteger>, List<BigInteger>>(
convertToNative((List<Int56>) results.get(0).getValue()),
convertToNative((List<Uint160>) results.get(1).getValue()));
}
});
}
public RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>> positions(
byte[] param0) {
final Function function = new Function(FUNC_POSITIONS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Bytes32(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger> call()
throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue());
}
});
}
public RemoteFunctionCall<Tuple2<BigInteger, BigInteger>> protocolFees() {
final Function function = new Function(FUNC_PROTOCOLFEES,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
return new RemoteFunctionCall<Tuple2<BigInteger, BigInteger>>(function,
new Callable<Tuple2<BigInteger, BigInteger>>() {
@Override
public Tuple2<BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple2<BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> setFeeProtocol(BigInteger feeProtocol0,
BigInteger feeProtocol1) {
final Function function = new Function(
FUNC_SETFEEPROTOCOL,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint8(feeProtocol0),
new org.web3j.abi.datatypes.generated.Uint8(feeProtocol1)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> slot0(
) {
final Function function = new Function(FUNC_SLOT0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint8>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(Boolean) results.get(6).getValue());
}
});
}
public RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>> snapshotCumulativesInside(
BigInteger tickLower, BigInteger tickUpper) {
final Function function = new Function(FUNC_SNAPSHOTCUMULATIVESINSIDE,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
new org.web3j.abi.datatypes.generated.Int24(tickUpper)),
Arrays.<TypeReference<?>>asList(new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}));
return new RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>>(function,
new Callable<Tuple3<BigInteger, BigInteger, BigInteger>>() {
@Override
public Tuple3<BigInteger, BigInteger, BigInteger> call() throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple3<BigInteger, BigInteger, BigInteger>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue());
}
});
}
public RemoteFunctionCall<TransactionReceipt> swap(String recipient, Boolean zeroForOne,
BigInteger amountSpecified, BigInteger sqrtPriceLimitX96, byte[] data) {
final Function function = new Function(
FUNC_SWAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
new org.web3j.abi.datatypes.Bool(zeroForOne),
new org.web3j.abi.datatypes.generated.Int256(amountSpecified),
new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceLimitX96),
new org.web3j.abi.datatypes.DynamicBytes(data)),
Collections.<TypeReference<?>>emptyList());
return executeRemoteCallTransaction(function);
}
public RemoteFunctionCall<BigInteger> tickBitmap(BigInteger param0) {
final Function function = new Function(FUNC_TICKBITMAP,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int16(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<BigInteger> tickSpacing() {
final Function function = new Function(FUNC_TICKSPACING,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Int24>() {}));
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
}
public RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> ticks(
BigInteger param0) {
final Function function = new Function(FUNC_TICKS,
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(param0)),
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Int128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}, new TypeReference<Bool>() {}));
return new RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
new Callable<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
@Override
public Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
) throws Exception {
List<Type> results = executeCallMultipleValueReturn(function);
return new Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
(BigInteger) results.get(0).getValue(),
(BigInteger) results.get(1).getValue(),
(BigInteger) results.get(2).getValue(),
(BigInteger) results.get(3).getValue(),
(BigInteger) results.get(4).getValue(),
(BigInteger) results.get(5).getValue(),
(BigInteger) results.get(6).getValue(),
(Boolean) results.get(7).getValue());
}
});
}
public RemoteFunctionCall<String> token0() {
final Function function = new Function(FUNC_TOKEN0,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
public RemoteFunctionCall<String> token1() {
final Function function = new Function(FUNC_TOKEN1,
Arrays.<Type>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
return executeRemoteCallSingleValueReturn(function, String.class);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, credentials, gasPrice, gasLimit);
}
@Deprecated
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, credentials, contractGasProvider);
}
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
return new UniswapV3Pool(contractAddress, web3j, transactionManager, contractGasProvider);
}
public static class BurnEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class CollectProtocolEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
}
public static class FlashEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger paid0;
public BigInteger paid1;
}
public static class IncreaseObservationCardinalityNextEventResponse extends BaseEventResponse {
public BigInteger observationCardinalityNextOld;
public BigInteger observationCardinalityNextNew;
}
public static class InitializeEventResponse extends BaseEventResponse {
public BigInteger sqrtPriceX96;
public BigInteger tick;
}
public static class MintEventResponse extends BaseEventResponse {
public String owner;
public BigInteger tickLower;
public BigInteger tickUpper;
public String sender;
public BigInteger amount;
public BigInteger amount0;
public BigInteger amount1;
}
public static class SetFeeProtocolEventResponse extends BaseEventResponse {
public BigInteger feeProtocol0Old;
public BigInteger feeProtocol1Old;
public BigInteger feeProtocol0New;
public BigInteger feeProtocol1New;
}
public static class SwapEventResponse extends BaseEventResponse {
public String sender;
public String recipient;
public BigInteger amount0;
public BigInteger amount1;
public BigInteger sqrtPriceX96;
public BigInteger liquidity;
public BigInteger tick;
}
}

View File

@@ -2,4 +2,11 @@ package stream.dto;
public class AddressId extends ChainId { public class AddressId extends ChainId {
public String address; public String address;
public AddressId() {}
public AddressId(int chainId, String address) {
super(chainId);
this.address = address;
}
} }

View File

@@ -12,4 +12,9 @@ public class BlockHash extends BlockId {
public Object getId() { public Object getId() {
return this.hash; return this.hash;
} }
@Override
public String toString() {
return this.hash;
}
} }

View File

@@ -12,4 +12,9 @@ public class BlockNumber extends BlockId {
public Object getId() { public Object getId() {
return this.number; return this.number;
} }
@Override
public String toString() {
return String.valueOf(this.number);
}
} }

View File

@@ -4,4 +4,10 @@ import java.io.Serializable;
public class ChainId implements Serializable { public class ChainId implements Serializable {
public int chainId; public int chainId;
public ChainId() {}
public ChainId(int chainId) {
this.chainId = chainId;
}
} }

View File

@@ -7,6 +7,8 @@ import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.Type; import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String; import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.*; import org.web3j.abi.datatypes.generated.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serial; import java.io.Serial;
import java.io.Serializable; import java.io.Serializable;
@@ -20,6 +22,8 @@ import static stream.io.EthUtils.keccak;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public class ElaboratedEventType<T extends EventLog> extends EventType { public class ElaboratedEventType<T extends EventLog> extends EventType {
private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class);
public transient String name; public transient String name;
public transient List<String> paramNames; public transient List<String> paramNames;
@@ -63,7 +67,9 @@ public class ElaboratedEventType<T extends EventLog> extends EventType {
int dataValuesIndex = 0; int dataValuesIndex = 0;
int topicsIndex = 1; // the first topic is the event signature int topicsIndex = 1; // the first topic is the event signature
List<Object> args = new ArrayList<>(); List<Object> args = new ArrayList<>();
for (TypeReference<Type> paramType : this.paramTypes) { for (int i = 0; i < this.paramTypes.size(); i++) {
TypeReference<Type> paramType = this.paramTypes.get(i);
String paramName = this.paramNames.get(i);
Object value; Object value;
if (paramType.isIndexed()) { if (paramType.isIndexed()) {
String encoded = topics.get(topicsIndex++); String encoded = topics.get(topicsIndex++);

View File

@@ -1,6 +1,6 @@
package stream.dto; package stream.dto;
enum Exchange { public enum Exchange {
UNISWAP_V2, UNISWAP_V2,
UNISWAP_V3, UNISWAP_V3,
UNISWAP_V4, UNISWAP_V4,

View File

@@ -0,0 +1,56 @@
package stream.dto;
import java.math.BigDecimal;
public class OHLCCandle {
private final String pool;
private final String token0;
private final String token1;
private final long windowStart;
private final long windowEnd;
private final BigDecimal open;
private final BigDecimal high;
private final BigDecimal low;
private final BigDecimal close;
private final BigDecimal volume;
private final int tradeCount;
public OHLCCandle(String pool, String token0, String token1,
long windowStart, long windowEnd,
BigDecimal open, BigDecimal high, BigDecimal low, BigDecimal close,
BigDecimal volume, int tradeCount) {
this.pool = pool;
this.token0 = token0;
this.token1 = token1;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.open = open;
this.high = high;
this.low = low;
this.close = close;
this.volume = volume;
this.tradeCount = tradeCount;
}
// Getters
public String getPool() { return pool; }
public String getToken0() { return token0; }
public String getToken1() { return token1; }
public long getWindowStart() { return windowStart; }
public long getWindowEnd() { return windowEnd; }
public BigDecimal getOpen() { return open; }
public BigDecimal getHigh() { return high; }
public BigDecimal getLow() { return low; }
public BigDecimal getClose() { return close; }
public BigDecimal getVolume() { return volume; }
public int getTradeCount() { return tradeCount; }
@Override
public String toString() {
return String.format("OHLC[%s] %s/%s %d-%d: O=%.4f H=%.4f L=%.4f C=%.4f V=%.4f Trades=%d",
pool.substring(0, 8) + "...",
token0.substring(0, 6), token1.substring(0, 6),
windowStart, windowEnd,
open, high, low, close, volume, tradeCount);
}
}

View File

@@ -8,11 +8,35 @@ public class Swap extends ChainId {
@Serial @Serial
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
Long time; // milliseconds public Long time; // milliseconds
Exchange exchange; public Exchange exchange;
String pool; // address public String pool; // address
String takerAsset; // token address public String takerAsset; // token address
String makerAsset; // token address public String makerAsset; // token address
BigDecimal amount; // positive means the taker bought; negative means the taker sold. public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold.
BigDecimal price; public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold.
public BigDecimal price;
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) {
super(chainId);
this.time = time;
this.exchange = exchange;
this.pool = pool;
this.takerAsset = takerAsset;
this.makerAsset = makerAsset;
this.amountIn = amountIn;
this.amountOut = amountOut;
this.price = price; // Total trade value
}
// Getter methods
public Long getTime() { return time; }
public Exchange getExchange() { return exchange; }
public String getPool() { return pool; }
public String getTakerAsset() { return takerAsset; }
public String getMakerAsset() { return makerAsset; }
public BigDecimal getAmountIn() { return amountIn; }
public BigDecimal getAmountOut() { return amountOut; }
public BigDecimal getPrice() { return price; }
} }

View File

@@ -29,14 +29,13 @@ public class SwapEventLog extends EventLog implements Serializable {
public String sender; public String sender;
public String recipient; public String recipient;
@BigInt
public BigInteger amount0; public BigInteger amount0;
@BigInt
public BigInteger amount1; public BigInteger amount1;
@BigInt
public BigInteger sqrtPriceX96; public BigInteger sqrtPriceX96;
@BigInt
public BigInteger liquidity; public BigInteger liquidity;
public int tick; public int tick;
public String getAddress() {
return this.address;
}
} }

View File

@@ -0,0 +1,59 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapEventWithTokenIds implements Serializable {
private SwapEventLog swapEvent;
private String token0Address;
private String token1Address;
private BigInteger timestamp;
public SwapEventWithTokenIds() {
}
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
}
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.token0Address = token0Address;
this.token1Address = token1Address;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventLog swapEvent) {
this.swapEvent = swapEvent;
}
public String getToken0Address() {
return token0Address;
}
public void setToken0Address(String token0Address) {
this.token0Address = token0Address;
}
public String getToken1Address() {
return token1Address;
}
public void setToken1Address(String token1Address) {
this.token1Address = token1Address;
}
public BigInteger getTimestamp() {
return timestamp;
}
public void setTimestamp(BigInteger timestamp) {
this.timestamp = timestamp;
}
}

View File

@@ -0,0 +1,42 @@
package stream.dto;
import java.io.Serializable;
public class SwapEventWithTokenMetadata implements Serializable {
private SwapEventWithTokenIds swapEvent;
private Token token0;
private Token token1;
public SwapEventWithTokenMetadata() {
}
public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) {
this.swapEvent = swapEvent;
this.token0 = token0;
this.token1 = token1;
}
public SwapEventWithTokenIds getSwapEvent() {
return swapEvent;
}
public void setSwapEvent(SwapEventWithTokenIds swapEvent) {
this.swapEvent = swapEvent;
}
public Token getToken0() {
return token0;
}
public void setToken0(Token token0) {
this.token0 = token0;
}
public Token getToken1() {
return token1;
}
public void setToken1(Token token1) {
this.token1 = token1;
}
}

View File

@@ -0,0 +1,24 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
public class SwapWithTimestamp implements Serializable {
private static final long serialVersionUID = 1L;
private final SwapEventLog swapEvent;
private final BigInteger timestamp;
public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) {
this.swapEvent = swapEvent;
this.timestamp = timestamp;
}
public SwapEventLog getSwapEvent() {
return swapEvent;
}
public BigInteger getTimestamp() {
return timestamp;
}
}

View File

@@ -6,7 +6,22 @@ public class Token extends AddressId {
@Serial @Serial
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
String name; public String name;
String symbol; public String symbol;
int decimals; public int decimals;
@SuppressWarnings("unused")
public Token() {}
public Token(int chainId, String address, String name, String symbol, int decimals) {
super(chainId, address);
this.name = name;
this.symbol = symbol;
this.decimals = decimals;
}
@Override
public String toString() {
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
}
} }

View File

@@ -0,0 +1,69 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.BlockId;
import stream.dto.BlockNumber;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
private transient Web3j w3;
private static final AtomicInteger activeOperations = new AtomicInteger(0);
private static final AtomicInteger totalOperations = new AtomicInteger(0);
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
}
@Override
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
int active = activeOperations.incrementAndGet();
int total = totalOperations.incrementAndGet();
if (total % 100 == 0) {
log.info("BlockElaborator - Total operations: {}, Active operations: {}", total, active);
}
CompletableFuture.supplyAsync(() -> {
try {
return getBlock(blockId);
} catch (Exception e) {
log.error("Failed to get block {} on chain {}", blockId, e);
throw new RuntimeException("Error processing block " + blockId, e);
}
}).thenAccept(result -> {
activeOperations.decrementAndGet();
resultFuture.complete(Collections.singleton(result));
}).exceptionally(throwable -> {
activeOperations.decrementAndGet();
resultFuture.completeExceptionally(throwable);
return null;
});
}
private EthBlock getBlock(BlockId id) {
try {
return (id instanceof BlockNumber) ?
w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() :
w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,80 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.core.methods.response.EthBlock;
import stream.dto.BlockHash;
import stream.dto.SwapEventLog;
import stream.dto.SwapWithTimestamp;
import java.math.BigInteger;
import java.util.Collections;
public class BlockTimestampElaborator extends RichAsyncFunction<SwapEventLog, SwapWithTimestamp> {
private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class);
private transient BlockElaborator blockElaborator;
@Override
public void open(OpenContext openContext) throws Exception {
blockElaborator = new BlockElaborator();
blockElaborator.setRuntimeContext(getRuntimeContext());
blockElaborator.open(openContext);
}
@Override
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<SwapWithTimestamp> resultFuture) {
try {
BlockHash blockHash = new BlockHash();
blockHash.hash = swapEvent.blockHash;
// Create a custom ResultFuture to handle the block elaboration result
ResultFuture<EthBlock> blockResultFuture = new ResultFuture<EthBlock>() {
@Override
public void complete(java.util.Collection<EthBlock> result) {
try {
if (!result.isEmpty()) {
EthBlock ethBlock = result.iterator().next();
BigInteger timestamp = ethBlock.getBlock().getTimestamp();
SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp);
resultFuture.complete(Collections.singleton(swapWithTimestamp));
} else {
log.error("No block found for block hash {}", swapEvent.blockHash);
resultFuture.completeExceptionally(new RuntimeException("No block found"));
}
} catch (Exception e) {
log.error("Failed to get timestamp for block {} of swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, e);
resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e));
}
}
@Override
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<EthBlock> collectionSupplier) {
try {
complete(collectionSupplier.get());
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public void completeExceptionally(Throwable error) {
log.error("Failed to get block {} for swap in tx {}",
swapEvent.blockHash, swapEvent.transactionHash, error);
resultFuture.completeExceptionally(error);
}
};
// Invoke the block elaborator asynchronously
blockElaborator.asyncInvoke(blockHash, blockResultFuture);
} catch (Exception e) {
log.error("Error in BlockTimestampElaborator for swap in tx {}",
swapEvent.transactionHash, e);
resultFuture.completeExceptionally(e);
}
}
}

View File

@@ -1,7 +1,9 @@
package stream.io; package stream.io;
import java.util.HexFormat;
import org.bouncycastle.jcajce.provider.digest.Keccak; import org.bouncycastle.jcajce.provider.digest.Keccak;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class EthUtils { public class EthUtils {
public static boolean isValidAddress(String address) { public static boolean isValidAddress(String address) {
@@ -35,6 +37,6 @@ public class EthUtils {
public static String keccak(String message) { public static String keccak(String message) {
byte[] hash = new Keccak.Digest256().digest(message.getBytes()); byte[] hash = new Keccak.Digest256().digest(message.getBytes());
return "0x"+ByteUtils.toHexString(hash); return "0x" + HexFormat.of().formatHex(hash);
} }
} }

View File

@@ -0,0 +1,34 @@
package stream.io;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.JedisPooled;
import java.time.Duration;
import java.util.Map;
public class JedisClient {
protected static volatile JedisPooled jedis = null;
private static final Object jedisLock = new Object();
static public JedisPooled get(Map<String, String> params) {
if (jedis == null) {
synchronized (jedisLock) {
String url = params.getOrDefault("redis_url", "http://localhost:6379");
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(connections);
poolConfig.setMaxIdle(connections);
poolConfig.setMinIdle(0);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWait(Duration.ofSeconds(5));
// Enables sending a PING command periodically while the connection is idle.
poolConfig.setTestWhileIdle(true);
// controls the period between checks for idle connections in the pool
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
jedis = new JedisPooled(poolConfig, url);
}
}
return jedis;
}
}

View File

@@ -0,0 +1,81 @@
package stream.io;
import stream.dto.SwapWithTimestamp;
import stream.dto.SwapEventWithTokenIds;
import stream.contract.UniswapV3Pool;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.web3j.crypto.Credentials;
import org.web3j.tx.gas.DefaultGasProvider;
import java.util.concurrent.CompletableFuture;
import java.util.Collections;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class PoolTokenIdElaborator extends RichAsyncFunction<SwapWithTimestamp, SwapEventWithTokenIds> {
private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class);
private transient Web3j web3j;
private transient Credentials credentials;
private transient DefaultGasProvider gasProvider;
@Override
public void open(OpenContext openContext) throws Exception {
super.open(openContext);
// Get RPC URL from job parameters
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
// TODO: Get from configuration if needed
// Initialize Web3j
this.web3j = Web3j.build(new HttpService(rpcUrl));
// Dummy credentials for read-only operations
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
// Default gas provider
this.gasProvider = new DefaultGasProvider();
}
@Override
public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture<SwapEventWithTokenIds> resultFuture) throws Exception {
CompletableFuture.supplyAsync(() -> {
try {
// Load the pool contract
UniswapV3Pool pool = UniswapV3Pool.load(
swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event
web3j,
credentials,
gasProvider
);
// Get token addresses
String token0 = pool.token0().send();
String token1 = pool.token1().send();
// Create enriched event with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp());
} catch (Exception e) {
log.error("Error fetching pool tokens for swap in pool {}",
swapWithTimestamp.getSwapEvent().getAddress(), e);
// Return original without enrichment but with timestamp
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp());
}
}).thenAccept(enriched -> {
resultFuture.complete(Collections.singletonList(enriched));
});
}
@Override
public void close() throws Exception {
if (web3j != null) {
web3j.shutdown();
}
}
}

View File

@@ -0,0 +1,89 @@
package stream.io;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.math.RoundingMode;
public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata, Swap> {
private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class);
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
@Override
public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception {
SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent();
SwapEventLog swapLog = event.getSwapEvent();
Token token0 = swapWithMetadata.getToken0();
Token token1 = swapWithMetadata.getToken1();
// Use timestamp from block elaboration and set exchange as UNISWAP_V3
Long time = event.getTimestamp().longValue();
Exchange exchange = Exchange.UNISWAP_V3;
// Pool address
String pool = swapLog.address;
// Get sqrtPriceX96 and calculate actual price
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
// Adjust price for decimals (price is token1/token0)
int decimalDiff = token0.decimals - token1.decimals;
BigDecimal adjustedPrice;
if (decimalDiff >= 0) {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
} else {
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
}
// Determine which token is in and which is out based on amount signs
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
String takerAsset;
String makerAsset;
BigDecimal amountIn;
BigDecimal amountOut;
BigDecimal finalPrice;
if (isToken0In) {
// User is sending token0, receiving token1
takerAsset = event.getToken0Address();
makerAsset = event.getToken1Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount0.abs())
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount1)
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
// Price is how much token1 you get per token0
finalPrice = adjustedPrice;
} else {
// User is sending token1, receiving token0
takerAsset = event.getToken1Address();
makerAsset = event.getToken0Address();
// Convert amounts to human-readable format using decimals
amountIn = new BigDecimal(swapLog.amount1.abs())
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
amountOut = new BigDecimal(swapLog.amount0)
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
// We need to calculate price as token1/token0 to be consistent with token0 as quote currency
// Currently adjustedPrice = token1/token0, so we keep it as is
// This ensures price always represents how many token0 (quote) per 1 token1 (base)
// For example, in WETH/USDC: price = 2000 means 1 WETH costs 2000 USDC
finalPrice = adjustedPrice;
}
// Pass both amountIn and amountOut to constructor with unit price
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
}
}

View File

@@ -0,0 +1,66 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.tx.ReadonlyTransactionManager;
import org.web3j.tx.gas.DefaultGasProvider;
import stream.contract.ERC20;
import stream.dto.AddressId;
import stream.dto.Token;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
private transient Web3j w3;
private DefaultGasProvider gasProvider;
private ReadonlyTransactionManager transactionManager;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
gasProvider = new DefaultGasProvider();
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
}
@Override
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
CompletableFuture.supplyAsync(() -> {
try {
return processToken(address);
} catch (Exception e) {
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
throw new RuntimeException("Error processing token: " + address, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private Token processToken(AddressId address) {
String name;
String symbol;
int decimals;
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
try {
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
CompletableFuture<String> nameAsync = contract.name().sendAsync();
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
decimals = decimalsAsync.get().intValue();
name = nameAsync.get();
symbol = symbolAsync.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return new Token(address.chainId, address.address, name, symbol, decimals);
}
}

View File

@@ -0,0 +1,50 @@
package stream.io;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.config.StreamingDefaults;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.MODERN_TLS;
public class Web3Client {
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
private static volatile Web3j w3 = null;
private static final Object w3Lock = new Object();
static public Web3j get(Map<String, String> params) {
if (w3==null) {
synchronized (w3Lock) {
log.info("Initializing Web3 client");
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", String.valueOf(StreamingDefaults.MAX_IDLE_CONNECTIONS)));
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", String.valueOf(StreamingDefaults.KEEP_ALIVE_MINUTES)));
Duration timeout = Duration.ofSeconds(30);
ConnectionPool connectionPool = new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES);
log.info("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
url, maxIdleConnections, keepAlive, timeout.getSeconds());
var httpClient = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
.connectionPool(connectionPool)
.build();
w3 = Web3j.build(new HttpService(url, httpClient));
log.info("Web3 client initialized successfully");
}
}
return w3;
}
}

View File

@@ -0,0 +1,41 @@
package stream.ohlc;
import java.math.BigDecimal;
public class OHLCAccumulator {
public String pool = null;
public String token0 = null; // Added
public String token1 = null; // Added
public BigDecimal open = null;
public BigDecimal high = null;
public BigDecimal low = null;
public BigDecimal close = null;
public BigDecimal volume = BigDecimal.ZERO;
public int tradeCount = 0;
public long firstTradeTime = 0;
public long lastTradeTime = 0;
public OHLCAccumulator() {}
public void updatePrice(BigDecimal price, long timestamp) {
if (open == null) {
open = price;
high = price;
low = price;
firstTradeTime = timestamp;
} else {
high = high.max(price);
low = low.min(price);
}
close = price;
lastTradeTime = timestamp;
}
public void addVolume(BigDecimal amount) {
volume = volume.add(amount);
}
public void incrementTradeCount() {
tradeCount++;
}
}

View File

@@ -0,0 +1,98 @@
package stream.ohlc;
import org.apache.flink.api.common.functions.AggregateFunction;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
import java.math.BigDecimal;
public class OHLCAggregator implements AggregateFunction<Swap, OHLCAccumulator, OHLCCandle> {
private final long windowSize = 60; // 1 minute in seconds
@Override
public OHLCAccumulator createAccumulator() {
return new OHLCAccumulator();
}
@Override
public OHLCAccumulator add(Swap swap, OHLCAccumulator accumulator) {
// Initialize pool and tokens on first swap
if (accumulator.pool == null) {
accumulator.pool = swap.getPool();
// Store tokens in consistent order (you could sort by address)
accumulator.token0 = swap.getTakerAsset();
accumulator.token1 = swap.getMakerAsset();
}
// Update OHLC prices
accumulator.updatePrice(swap.getPrice(), swap.getTime());
// Calculate volume (using the taker's input amount as volume)
BigDecimal volume = swap.getAmountIn().abs();
accumulator.addVolume(volume);
// Increment trade count
accumulator.incrementTradeCount();
return accumulator;
}
@Override
public OHLCAccumulator merge(OHLCAccumulator acc1, OHLCAccumulator acc2) {
OHLCAccumulator merged = new OHLCAccumulator();
// Merge pool and token info
merged.pool = acc1.pool != null ? acc1.pool : acc2.pool;
merged.token0 = acc1.token0 != null ? acc1.token0 : acc2.token0;
merged.token1 = acc1.token1 != null ? acc1.token1 : acc2.token1;
// Merge OHLC data
if (acc1.open != null && acc2.open != null) {
merged.open = acc1.firstTradeTime <= acc2.firstTradeTime ? acc1.open : acc2.open;
merged.close = acc1.lastTradeTime >= acc2.lastTradeTime ? acc1.close : acc2.close;
merged.high = acc1.high.max(acc2.high);
merged.low = acc1.low.min(acc2.low);
merged.firstTradeTime = Math.min(acc1.firstTradeTime, acc2.firstTradeTime);
merged.lastTradeTime = Math.max(acc1.lastTradeTime, acc2.lastTradeTime);
} else if (acc1.open != null) {
merged.open = acc1.open;
merged.close = acc1.close;
merged.high = acc1.high;
merged.low = acc1.low;
merged.firstTradeTime = acc1.firstTradeTime;
merged.lastTradeTime = acc1.lastTradeTime;
} else if (acc2.open != null) {
merged.open = acc2.open;
merged.close = acc2.close;
merged.high = acc2.high;
merged.low = acc2.low;
merged.firstTradeTime = acc2.firstTradeTime;
merged.lastTradeTime = acc2.lastTradeTime;
}
merged.volume = acc1.volume.add(acc2.volume);
merged.tradeCount = acc1.tradeCount + acc2.tradeCount;
return merged;
}
@Override
public OHLCCandle getResult(OHLCAccumulator accumulator) {
long windowStart = (accumulator.firstTradeTime / windowSize) * windowSize;
long windowEnd = windowStart + windowSize;
return new OHLCCandle(
accumulator.pool,
accumulator.token0,
accumulator.token1,
windowStart,
windowEnd,
accumulator.open,
accumulator.high,
accumulator.low,
accumulator.close,
accumulator.volume,
accumulator.tradeCount
);
}
}

View File

@@ -0,0 +1,26 @@
package stream.ohlc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import java.time.Duration;
import stream.dto.Swap;
import stream.dto.OHLCCandle;
public class OHLCPipeline {
public DataStream<OHLCCandle> createOHLCStream(DataStream<Swap> swapStream) {
return swapStream
// NO watermarks needed for processing-time windows
.keyBy(Swap::getPool)
// Use 1-minute processing-time windows
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
// Simply use the aggregator
.aggregate(new OHLCAggregator())
// Filter out empty windows
.filter(candle -> candle.getTradeCount() > 0);
}
}