Compare commits
8 Commits
feature/to
...
04d441d6b1
| Author | SHA1 | Date | |
|---|---|---|---|
| 04d441d6b1 | |||
|
|
00ae425700 | ||
|
|
e58fdc0b06 | ||
|
|
7449b4b4b1 | ||
|
|
6829053d94 | ||
|
|
3a64f5630c | ||
|
|
9f65c2e850 | ||
|
|
8c672f250e |
4
.mvn/jvm.config
Normal file
4
.mvn/jvm.config
Normal file
@@ -0,0 +1,4 @@
|
||||
--add-opens java.base/java.lang=ALL-UNNAMED
|
||||
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
|
||||
--add-opens java.base/java.net=ALL-UNNAMED
|
||||
--add-opens java.base/java.util=ALL-UNNAMED
|
||||
35
pom.xml
35
pom.xml
@@ -35,9 +35,11 @@ under the License.
|
||||
<scala.binary.version>2.12</scala.binary.version>
|
||||
<flink.version>2.1.0</flink.version>
|
||||
<log4j.version>2.24.1</log4j.version>
|
||||
<web3j.version>5.0.0</web3j.version>
|
||||
<web3j.version>5.0.1</web3j.version>
|
||||
<httpclient5.version>5.3.1</httpclient5.version>
|
||||
<jsonrpc4j.version>1.6</jsonrpc4j.version>
|
||||
<jedis.version>6.1.0</jedis.version>
|
||||
<jackson.version>2.17.1</jackson.version>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
@@ -55,7 +57,23 @@ under the License.
|
||||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<!-- Apache Flink dependencies -->
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Apache Flink dependencies -->
|
||||
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
@@ -132,7 +150,14 @@ under the License.
|
||||
<version>${log4j.version}</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>redis.clients</groupId>
|
||||
<artifactId>jedis</artifactId>
|
||||
<version>${jedis.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
@@ -143,8 +168,8 @@ under the License.
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.1</version>
|
||||
<configuration>
|
||||
<source>16</source>
|
||||
<target>16</target>
|
||||
<source>${target.java.version}</source>
|
||||
<target>${target.java.version}</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
|
||||
@@ -19,18 +19,24 @@
|
||||
package stream;
|
||||
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.util.ParameterTool;
|
||||
import org.apache.flink.util.Collector;
|
||||
import stream.dto.*;
|
||||
import stream.io.SwapEventEnricher;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import stream.io.PoolElaborator;
|
||||
import stream.io.TokenElaborator;
|
||||
import stream.source.eventlog.EventLogSourceFactory;
|
||||
import stream.source.newheads.NewHeadsSourceFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -58,6 +64,7 @@ public class DataStreamJob {
|
||||
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
|
||||
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
env.getConfig().setGlobalJobParameters(parameters);
|
||||
// do not do this until considering how secrets are handled by flink
|
||||
// env.getConfig().setGlobalJobParameters(parameters);
|
||||
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
|
||||
@@ -67,6 +74,7 @@ public class DataStreamJob {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||
|
||||
|
||||
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
||||
.fromSource(
|
||||
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
|
||||
@@ -75,26 +83,65 @@ public class DataStreamJob {
|
||||
TypeInformation.of(ArbitrumOneBlock.class)
|
||||
);
|
||||
|
||||
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
|
||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||
|
||||
// Create SwapEnricher to add token addresses and metadata
|
||||
SwapEventEnricher swapEnricher = new SwapEventEnricher(httpUri.toString());
|
||||
|
||||
// Enrich swap events with token addresses and metadata
|
||||
DataStream<EnrichedSwapEventLog> fullyEnrichedSwapStream = org.apache.flink.streaming.api.datastream.AsyncDataStream
|
||||
.unorderedWait(swapStream, swapEnricher, 10000, java.util.concurrent.TimeUnit.MILLISECONDS)
|
||||
.name("Swap Enricher");
|
||||
|
||||
// Print fully enriched swap events with all metadata
|
||||
fullyEnrichedSwapStream
|
||||
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
swapStream,
|
||||
new PoolElaborator(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// Extract token addresses and elaborate with metadata
|
||||
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
|
||||
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() {
|
||||
@Override
|
||||
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception {
|
||||
collector.collect(new AddressId(42161, event.getToken0Address()));
|
||||
collector.collect(new AddressId(42161, event.getToken1Address()));
|
||||
}
|
||||
});
|
||||
|
||||
// Elaborate tokens with metadata
|
||||
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
|
||||
tokenAddresses,
|
||||
new TokenElaborator(),
|
||||
120,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// Print comprehensive swap event data with token metadata
|
||||
elaboratedSwapStream
|
||||
.map(event -> {
|
||||
try {
|
||||
return mapper.writeValueAsString(event);
|
||||
String json = mapper.writeValueAsString(event);
|
||||
log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}",
|
||||
event.getSwapEvent().address,
|
||||
event.getSwapEvent().blockNumber,
|
||||
event.getSwapEvent().transactionHash,
|
||||
event.getToken0Address(),
|
||||
event.getToken1Address(),
|
||||
event.getSwapEvent().amount0,
|
||||
event.getSwapEvent().amount1,
|
||||
event.getSwapEvent().tick);
|
||||
return json;
|
||||
} catch (Exception e) {
|
||||
return "Error converting enriched swap event to JSON: " + e.getMessage();
|
||||
return "Error converting elaborated swap event to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Fully Enriched Swap Event: ");
|
||||
.print("Swap Event: ");
|
||||
|
||||
// Print token metadata when available
|
||||
elaboratedTokens
|
||||
.map(token -> {
|
||||
log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}",
|
||||
token.address, token.name, token.symbol, token.decimals);
|
||||
return token.toString();
|
||||
})
|
||||
.print("Token: ");
|
||||
|
||||
env.execute("Ethereum Block Stream");
|
||||
}
|
||||
|
||||
7
src/main/java/stream/contract/ArbitrumOne.java
Normal file
7
src/main/java/stream/contract/ArbitrumOne.java
Normal file
@@ -0,0 +1,7 @@
|
||||
package stream.contract;
|
||||
|
||||
public class ArbitrumOne {
|
||||
static public final int CHAIN_ID = 42161;
|
||||
|
||||
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
|
||||
}
|
||||
263
src/main/java/stream/contract/ERC20.java
Normal file
263
src/main/java/stream/contract/ERC20.java
Normal file
@@ -0,0 +1,263 @@
|
||||
package stream.contract;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.web3j.abi.EventEncoder;
|
||||
import org.web3j.abi.TypeReference;
|
||||
import org.web3j.abi.datatypes.Address;
|
||||
import org.web3j.abi.datatypes.Event;
|
||||
import org.web3j.abi.datatypes.Function;
|
||||
import org.web3j.abi.datatypes.Type;
|
||||
import org.web3j.abi.datatypes.Utf8String;
|
||||
import org.web3j.abi.datatypes.generated.Uint256;
|
||||
import org.web3j.abi.datatypes.generated.Uint8;
|
||||
import org.web3j.crypto.Credentials;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.core.DefaultBlockParameter;
|
||||
import org.web3j.protocol.core.RemoteCall;
|
||||
import org.web3j.protocol.core.methods.request.EthFilter;
|
||||
import org.web3j.protocol.core.methods.response.Log;
|
||||
import org.web3j.protocol.core.methods.response.TransactionReceipt;
|
||||
import org.web3j.tx.Contract;
|
||||
import org.web3j.tx.TransactionManager;
|
||||
import org.web3j.tx.gas.ContractGasProvider;
|
||||
|
||||
/**
|
||||
* <p>Auto generated code.
|
||||
* <p><strong>Do not modify!</strong>
|
||||
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
|
||||
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
|
||||
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
|
||||
*
|
||||
* <p>Generated with web3j version 4.1.1.
|
||||
*/
|
||||
public class ERC20 extends Contract {
|
||||
private static final String BINARY = "Bin file was not provided";
|
||||
|
||||
public static final String FUNC_NAME = "name";
|
||||
|
||||
public static final String FUNC_APPROVE = "approve";
|
||||
|
||||
public static final String FUNC_TOTALSUPPLY = "totalSupply";
|
||||
|
||||
public static final String FUNC_TRANSFERFROM = "transferFrom";
|
||||
|
||||
public static final String FUNC_DECIMALS = "decimals";
|
||||
|
||||
public static final String FUNC_BALANCEOF = "balanceOf";
|
||||
|
||||
public static final String FUNC_SYMBOL = "symbol";
|
||||
|
||||
public static final String FUNC_TRANSFER = "transfer";
|
||||
|
||||
public static final String FUNC_ALLOWANCE = "allowance";
|
||||
|
||||
public static final Event TRANSFER_EVENT = new Event("Transfer",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
|
||||
;
|
||||
|
||||
public static final Event APPROVAL_EVENT = new Event("Approval",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
|
||||
;
|
||||
|
||||
@Deprecated
|
||||
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
|
||||
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
|
||||
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
|
||||
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
|
||||
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
|
||||
}
|
||||
|
||||
public RemoteCall<String> name() {
|
||||
final Function function = new Function(FUNC_NAME,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||
}
|
||||
|
||||
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
|
||||
final Function function = new Function(
|
||||
FUNC_APPROVE,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
|
||||
new org.web3j.abi.datatypes.generated.Uint256(_value)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteCall<BigInteger> totalSupply() {
|
||||
final Function function = new Function(FUNC_TOTALSUPPLY,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
|
||||
final Function function = new Function(
|
||||
FUNC_TRANSFERFROM,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
|
||||
new org.web3j.abi.datatypes.Address(_to),
|
||||
new org.web3j.abi.datatypes.generated.Uint256(_value)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteCall<BigInteger> decimals() {
|
||||
final Function function = new Function(FUNC_DECIMALS,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteCall<BigInteger> balanceOf(String _owner) {
|
||||
final Function function = new Function(FUNC_BALANCEOF,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteCall<String> symbol() {
|
||||
final Function function = new Function(FUNC_SYMBOL,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||
}
|
||||
|
||||
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
|
||||
final Function function = new Function(
|
||||
FUNC_TRANSFER,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
|
||||
new org.web3j.abi.datatypes.generated.Uint256(_value)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
|
||||
final Function function = new Function(FUNC_ALLOWANCE,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
|
||||
new org.web3j.abi.datatypes.Address(_spender)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
|
||||
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
TransferEventResponse typedResponse = new TransferEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
|
||||
@Override
|
||||
public TransferEventResponse apply(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
|
||||
TransferEventResponse typedResponse = new TransferEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
|
||||
return transferEventFlowable(filter);
|
||||
}
|
||||
|
||||
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
|
||||
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
|
||||
@Override
|
||||
public ApprovalEventResponse apply(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
|
||||
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
|
||||
return approvalEventFlowable(filter);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
|
||||
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
|
||||
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
|
||||
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
|
||||
}
|
||||
|
||||
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
|
||||
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
|
||||
}
|
||||
|
||||
public static class TransferEventResponse {
|
||||
public Log log;
|
||||
|
||||
public String _from;
|
||||
|
||||
public String _to;
|
||||
|
||||
public BigInteger _value;
|
||||
}
|
||||
|
||||
public static class ApprovalEventResponse {
|
||||
public Log log;
|
||||
|
||||
public String _owner;
|
||||
|
||||
public String _spender;
|
||||
|
||||
public BigInteger _value;
|
||||
}
|
||||
}
|
||||
965
src/main/java/stream/contract/UniswapV3Pool.java
Normal file
965
src/main/java/stream/contract/UniswapV3Pool.java
Normal file
@@ -0,0 +1,965 @@
|
||||
package stream.contract;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import org.web3j.abi.EventEncoder;
|
||||
import org.web3j.abi.TypeReference;
|
||||
import org.web3j.abi.datatypes.Address;
|
||||
import org.web3j.abi.datatypes.Bool;
|
||||
import org.web3j.abi.datatypes.DynamicArray;
|
||||
import org.web3j.abi.datatypes.Event;
|
||||
import org.web3j.abi.datatypes.Function;
|
||||
import org.web3j.abi.datatypes.Type;
|
||||
import org.web3j.abi.datatypes.generated.Int128;
|
||||
import org.web3j.abi.datatypes.generated.Int24;
|
||||
import org.web3j.abi.datatypes.generated.Int256;
|
||||
import org.web3j.abi.datatypes.generated.Int56;
|
||||
import org.web3j.abi.datatypes.generated.Uint128;
|
||||
import org.web3j.abi.datatypes.generated.Uint16;
|
||||
import org.web3j.abi.datatypes.generated.Uint160;
|
||||
import org.web3j.abi.datatypes.generated.Uint24;
|
||||
import org.web3j.abi.datatypes.generated.Uint256;
|
||||
import org.web3j.abi.datatypes.generated.Uint32;
|
||||
import org.web3j.abi.datatypes.generated.Uint8;
|
||||
import org.web3j.crypto.Credentials;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.core.DefaultBlockParameter;
|
||||
import org.web3j.protocol.core.RemoteFunctionCall;
|
||||
import org.web3j.protocol.core.methods.request.EthFilter;
|
||||
import org.web3j.protocol.core.methods.response.BaseEventResponse;
|
||||
import org.web3j.protocol.core.methods.response.Log;
|
||||
import org.web3j.protocol.core.methods.response.TransactionReceipt;
|
||||
import org.web3j.tuples.generated.Tuple2;
|
||||
import org.web3j.tuples.generated.Tuple3;
|
||||
import org.web3j.tuples.generated.Tuple4;
|
||||
import org.web3j.tuples.generated.Tuple5;
|
||||
import org.web3j.tuples.generated.Tuple7;
|
||||
import org.web3j.tuples.generated.Tuple8;
|
||||
import org.web3j.tx.Contract;
|
||||
import org.web3j.tx.TransactionManager;
|
||||
import org.web3j.tx.gas.ContractGasProvider;
|
||||
|
||||
/**
|
||||
* <p>Auto generated code.
|
||||
* <p><strong>Do not modify!</strong>
|
||||
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
|
||||
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
|
||||
* <a href="https://github.com/LFDT-web3j/web3j/tree/main/codegen">codegen module</a> to update.
|
||||
*
|
||||
* <p>Generated with web3j version 1.7.0.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class UniswapV3Pool extends Contract {
|
||||
public static final String BINARY = "Bin file was not provided";
|
||||
|
||||
public static final String FUNC_BURN = "burn";
|
||||
|
||||
public static final String FUNC_COLLECT = "collect";
|
||||
|
||||
public static final String FUNC_COLLECTPROTOCOL = "collectProtocol";
|
||||
|
||||
public static final String FUNC_FACTORY = "factory";
|
||||
|
||||
public static final String FUNC_FEE = "fee";
|
||||
|
||||
public static final String FUNC_FEEGROWTHGLOBAL0X128 = "feeGrowthGlobal0X128";
|
||||
|
||||
public static final String FUNC_FEEGROWTHGLOBAL1X128 = "feeGrowthGlobal1X128";
|
||||
|
||||
public static final String FUNC_FLASH = "flash";
|
||||
|
||||
public static final String FUNC_INCREASEOBSERVATIONCARDINALITYNEXT = "increaseObservationCardinalityNext";
|
||||
|
||||
public static final String FUNC_INITIALIZE = "initialize";
|
||||
|
||||
public static final String FUNC_LIQUIDITY = "liquidity";
|
||||
|
||||
public static final String FUNC_MAXLIQUIDITYPERTICK = "maxLiquidityPerTick";
|
||||
|
||||
public static final String FUNC_MINT = "mint";
|
||||
|
||||
public static final String FUNC_OBSERVATIONS = "observations";
|
||||
|
||||
public static final String FUNC_OBSERVE = "observe";
|
||||
|
||||
public static final String FUNC_POSITIONS = "positions";
|
||||
|
||||
public static final String FUNC_PROTOCOLFEES = "protocolFees";
|
||||
|
||||
public static final String FUNC_SETFEEPROTOCOL = "setFeeProtocol";
|
||||
|
||||
public static final String FUNC_SLOT0 = "slot0";
|
||||
|
||||
public static final String FUNC_SNAPSHOTCUMULATIVESINSIDE = "snapshotCumulativesInside";
|
||||
|
||||
public static final String FUNC_SWAP = "swap";
|
||||
|
||||
public static final String FUNC_TICKBITMAP = "tickBitmap";
|
||||
|
||||
public static final String FUNC_TICKSPACING = "tickSpacing";
|
||||
|
||||
public static final String FUNC_TICKS = "ticks";
|
||||
|
||||
public static final String FUNC_TOKEN0 = "token0";
|
||||
|
||||
public static final String FUNC_TOKEN1 = "token1";
|
||||
|
||||
public static final Event BURN_EVENT = new Event("Burn",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
|
||||
;
|
||||
|
||||
public static final Event COLLECT_EVENT = new Event("Collect",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>() {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
|
||||
;
|
||||
|
||||
public static final Event COLLECTPROTOCOL_EVENT = new Event("CollectProtocol",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
|
||||
;
|
||||
|
||||
public static final Event FLASH_EVENT = new Event("Flash",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
|
||||
;
|
||||
|
||||
public static final Event INCREASEOBSERVATIONCARDINALITYNEXT_EVENT = new Event("IncreaseObservationCardinalityNext",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}));
|
||||
;
|
||||
|
||||
public static final Event INITIALIZE_EVENT = new Event("Initialize",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}));
|
||||
;
|
||||
|
||||
public static final Event MINT_EVENT = new Event("Mint",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}, new TypeReference<Address>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Int24>(true) {}, new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}));
|
||||
;
|
||||
|
||||
public static final Event SETFEEPROTOCOL_EVENT = new Event("SetFeeProtocol",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}, new TypeReference<Uint8>() {}));
|
||||
;
|
||||
|
||||
public static final Event SWAP_EVENT = new Event("Swap",
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Int256>() {}, new TypeReference<Int256>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint128>() {}, new TypeReference<Int24>() {}));
|
||||
;
|
||||
|
||||
@Deprecated
|
||||
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice,
|
||||
BigInteger gasLimit) {
|
||||
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
protected UniswapV3Pool(String contractAddress, Web3j web3j, Credentials credentials,
|
||||
ContractGasProvider contractGasProvider) {
|
||||
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
|
||||
BigInteger gasPrice, BigInteger gasLimit) {
|
||||
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
protected UniswapV3Pool(String contractAddress, Web3j web3j, TransactionManager transactionManager,
|
||||
ContractGasProvider contractGasProvider) {
|
||||
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
|
||||
}
|
||||
|
||||
public static List<BurnEventResponse> getBurnEvents(TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(BURN_EVENT, transactionReceipt);
|
||||
ArrayList<BurnEventResponse> responses = new ArrayList<BurnEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
BurnEventResponse typedResponse = new BurnEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
|
||||
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static BurnEventResponse getBurnEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(BURN_EVENT, log);
|
||||
BurnEventResponse typedResponse = new BurnEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
|
||||
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<BurnEventResponse> burnEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getBurnEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<BurnEventResponse> burnEventFlowable(DefaultBlockParameter startBlock,
|
||||
DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(BURN_EVENT));
|
||||
return burnEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<CollectEventResponse> getCollectEvents(
|
||||
TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECT_EVENT, transactionReceipt);
|
||||
ArrayList<CollectEventResponse> responses = new ArrayList<CollectEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
CollectEventResponse typedResponse = new CollectEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static CollectEventResponse getCollectEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECT_EVENT, log);
|
||||
CollectEventResponse typedResponse = new CollectEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<CollectEventResponse> collectEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getCollectEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<CollectEventResponse> collectEventFlowable(DefaultBlockParameter startBlock,
|
||||
DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(COLLECT_EVENT));
|
||||
return collectEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<CollectProtocolEventResponse> getCollectProtocolEvents(
|
||||
TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, transactionReceipt);
|
||||
ArrayList<CollectProtocolEventResponse> responses = new ArrayList<CollectProtocolEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static CollectProtocolEventResponse getCollectProtocolEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(COLLECTPROTOCOL_EVENT, log);
|
||||
CollectProtocolEventResponse typedResponse = new CollectProtocolEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getCollectProtocolEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<CollectProtocolEventResponse> collectProtocolEventFlowable(
|
||||
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(COLLECTPROTOCOL_EVENT));
|
||||
return collectProtocolEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<FlashEventResponse> getFlashEvents(TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(FLASH_EVENT, transactionReceipt);
|
||||
ArrayList<FlashEventResponse> responses = new ArrayList<FlashEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
FlashEventResponse typedResponse = new FlashEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static FlashEventResponse getFlashEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(FLASH_EVENT, log);
|
||||
FlashEventResponse typedResponse = new FlashEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.paid0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.paid1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<FlashEventResponse> flashEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getFlashEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<FlashEventResponse> flashEventFlowable(DefaultBlockParameter startBlock,
|
||||
DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(FLASH_EVENT));
|
||||
return flashEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<IncreaseObservationCardinalityNextEventResponse> getIncreaseObservationCardinalityNextEvents(
|
||||
TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, transactionReceipt);
|
||||
ArrayList<IncreaseObservationCardinalityNextEventResponse> responses = new ArrayList<IncreaseObservationCardinalityNextEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static IncreaseObservationCardinalityNextEventResponse getIncreaseObservationCardinalityNextEventFromLog(
|
||||
Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT, log);
|
||||
IncreaseObservationCardinalityNextEventResponse typedResponse = new IncreaseObservationCardinalityNextEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.observationCardinalityNextOld = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.observationCardinalityNextNew = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
|
||||
EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getIncreaseObservationCardinalityNextEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<IncreaseObservationCardinalityNextEventResponse> increaseObservationCardinalityNextEventFlowable(
|
||||
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(INCREASEOBSERVATIONCARDINALITYNEXT_EVENT));
|
||||
return increaseObservationCardinalityNextEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<InitializeEventResponse> getInitializeEvents(
|
||||
TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(INITIALIZE_EVENT, transactionReceipt);
|
||||
ArrayList<InitializeEventResponse> responses = new ArrayList<InitializeEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
InitializeEventResponse typedResponse = new InitializeEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static InitializeEventResponse getInitializeEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(INITIALIZE_EVENT, log);
|
||||
InitializeEventResponse typedResponse = new InitializeEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<InitializeEventResponse> initializeEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getInitializeEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<InitializeEventResponse> initializeEventFlowable(
|
||||
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(INITIALIZE_EVENT));
|
||||
return initializeEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<MintEventResponse> getMintEvents(TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(MINT_EVENT, transactionReceipt);
|
||||
ArrayList<MintEventResponse> responses = new ArrayList<MintEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
MintEventResponse typedResponse = new MintEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
|
||||
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static MintEventResponse getMintEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(MINT_EVENT, log);
|
||||
MintEventResponse typedResponse = new MintEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.tickLower = (BigInteger) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.tickUpper = (BigInteger) eventValues.getIndexedValues().get(2).getValue();
|
||||
typedResponse.sender = (String) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<MintEventResponse> mintEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getMintEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<MintEventResponse> mintEventFlowable(DefaultBlockParameter startBlock,
|
||||
DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(MINT_EVENT));
|
||||
return mintEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<SetFeeProtocolEventResponse> getSetFeeProtocolEvents(
|
||||
TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, transactionReceipt);
|
||||
ArrayList<SetFeeProtocolEventResponse> responses = new ArrayList<SetFeeProtocolEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static SetFeeProtocolEventResponse getSetFeeProtocolEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SETFEEPROTOCOL_EVENT, log);
|
||||
SetFeeProtocolEventResponse typedResponse = new SetFeeProtocolEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.feeProtocol0Old = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.feeProtocol1Old = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.feeProtocol0New = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.feeProtocol1New = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getSetFeeProtocolEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<SetFeeProtocolEventResponse> setFeeProtocolEventFlowable(
|
||||
DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(SETFEEPROTOCOL_EVENT));
|
||||
return setFeeProtocolEventFlowable(filter);
|
||||
}
|
||||
|
||||
public static List<SwapEventResponse> getSwapEvents(TransactionReceipt transactionReceipt) {
|
||||
List<Contract.EventValuesWithLog> valueList = staticExtractEventParametersWithLog(SWAP_EVENT, transactionReceipt);
|
||||
ArrayList<SwapEventResponse> responses = new ArrayList<SwapEventResponse>(valueList.size());
|
||||
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||
SwapEventResponse typedResponse = new SwapEventResponse();
|
||||
typedResponse.log = eventValues.getLog();
|
||||
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
|
||||
responses.add(typedResponse);
|
||||
}
|
||||
return responses;
|
||||
}
|
||||
|
||||
public static SwapEventResponse getSwapEventFromLog(Log log) {
|
||||
Contract.EventValuesWithLog eventValues = staticExtractEventParametersWithLog(SWAP_EVENT, log);
|
||||
SwapEventResponse typedResponse = new SwapEventResponse();
|
||||
typedResponse.log = log;
|
||||
typedResponse.sender = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||
typedResponse.recipient = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||
typedResponse.amount0 = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||
typedResponse.amount1 = (BigInteger) eventValues.getNonIndexedValues().get(1).getValue();
|
||||
typedResponse.sqrtPriceX96 = (BigInteger) eventValues.getNonIndexedValues().get(2).getValue();
|
||||
typedResponse.liquidity = (BigInteger) eventValues.getNonIndexedValues().get(3).getValue();
|
||||
typedResponse.tick = (BigInteger) eventValues.getNonIndexedValues().get(4).getValue();
|
||||
return typedResponse;
|
||||
}
|
||||
|
||||
public Flowable<SwapEventResponse> swapEventFlowable(EthFilter filter) {
|
||||
return web3j.ethLogFlowable(filter).map(log -> getSwapEventFromLog(log));
|
||||
}
|
||||
|
||||
public Flowable<SwapEventResponse> swapEventFlowable(DefaultBlockParameter startBlock,
|
||||
DefaultBlockParameter endBlock) {
|
||||
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||
filter.addSingleTopic(EventEncoder.encode(SWAP_EVENT));
|
||||
return swapEventFlowable(filter);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> burn(BigInteger tickLower, BigInteger tickUpper,
|
||||
BigInteger amount) {
|
||||
final Function function = new Function(
|
||||
FUNC_BURN,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
|
||||
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
|
||||
new org.web3j.abi.datatypes.generated.Uint128(amount)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> collect(String recipient, BigInteger tickLower,
|
||||
BigInteger tickUpper, BigInteger amount0Requested, BigInteger amount1Requested) {
|
||||
final Function function = new Function(
|
||||
FUNC_COLLECT,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
|
||||
new org.web3j.abi.datatypes.generated.Int24(tickLower),
|
||||
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
|
||||
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
|
||||
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> collectProtocol(String recipient,
|
||||
BigInteger amount0Requested, BigInteger amount1Requested) {
|
||||
final Function function = new Function(
|
||||
FUNC_COLLECTPROTOCOL,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
|
||||
new org.web3j.abi.datatypes.generated.Uint128(amount0Requested),
|
||||
new org.web3j.abi.datatypes.generated.Uint128(amount1Requested)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<String> factory() {
|
||||
final Function function = new Function(FUNC_FACTORY,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> fee() {
|
||||
final Function function = new Function(FUNC_FEE,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint24>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> feeGrowthGlobal0X128() {
|
||||
final Function function = new Function(FUNC_FEEGROWTHGLOBAL0X128,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> feeGrowthGlobal1X128() {
|
||||
final Function function = new Function(FUNC_FEEGROWTHGLOBAL1X128,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> flash(String recipient, BigInteger amount0,
|
||||
BigInteger amount1, byte[] data) {
|
||||
final Function function = new Function(
|
||||
FUNC_FLASH,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
|
||||
new org.web3j.abi.datatypes.generated.Uint256(amount0),
|
||||
new org.web3j.abi.datatypes.generated.Uint256(amount1),
|
||||
new org.web3j.abi.datatypes.DynamicBytes(data)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> increaseObservationCardinalityNext(
|
||||
BigInteger observationCardinalityNext) {
|
||||
final Function function = new Function(
|
||||
FUNC_INCREASEOBSERVATIONCARDINALITYNEXT,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint16(observationCardinalityNext)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> initialize(BigInteger sqrtPriceX96) {
|
||||
final Function function = new Function(
|
||||
FUNC_INITIALIZE,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceX96)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> liquidity() {
|
||||
final Function function = new Function(FUNC_LIQUIDITY,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> maxLiquidityPerTick() {
|
||||
final Function function = new Function(FUNC_MAXLIQUIDITYPERTICK,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> mint(String recipient, BigInteger tickLower,
|
||||
BigInteger tickUpper, BigInteger amount, byte[] data) {
|
||||
final Function function = new Function(
|
||||
FUNC_MINT,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
|
||||
new org.web3j.abi.datatypes.generated.Int24(tickLower),
|
||||
new org.web3j.abi.datatypes.generated.Int24(tickUpper),
|
||||
new org.web3j.abi.datatypes.generated.Uint128(amount),
|
||||
new org.web3j.abi.datatypes.DynamicBytes(data)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>> observations(
|
||||
BigInteger param0) {
|
||||
final Function function = new Function(FUNC_OBSERVATIONS,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint256(param0)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint32>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Bool>() {}));
|
||||
return new RemoteFunctionCall<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>(function,
|
||||
new Callable<Tuple4<BigInteger, BigInteger, BigInteger, Boolean>>() {
|
||||
@Override
|
||||
public Tuple4<BigInteger, BigInteger, BigInteger, Boolean> call() throws
|
||||
Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple4<BigInteger, BigInteger, BigInteger, Boolean>(
|
||||
(BigInteger) results.get(0).getValue(),
|
||||
(BigInteger) results.get(1).getValue(),
|
||||
(BigInteger) results.get(2).getValue(),
|
||||
(Boolean) results.get(3).getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>> observe(
|
||||
List<BigInteger> secondsAgos) {
|
||||
final Function function = new Function(FUNC_OBSERVE,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.DynamicArray<org.web3j.abi.datatypes.generated.Uint32>(
|
||||
org.web3j.abi.datatypes.generated.Uint32.class,
|
||||
org.web3j.abi.Utils.typeMap(secondsAgos, org.web3j.abi.datatypes.generated.Uint32.class))),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<DynamicArray<Int56>>() {}, new TypeReference<DynamicArray<Uint160>>() {}));
|
||||
return new RemoteFunctionCall<Tuple2<List<BigInteger>, List<BigInteger>>>(function,
|
||||
new Callable<Tuple2<List<BigInteger>, List<BigInteger>>>() {
|
||||
@Override
|
||||
public Tuple2<List<BigInteger>, List<BigInteger>> call() throws Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple2<List<BigInteger>, List<BigInteger>>(
|
||||
convertToNative((List<Int56>) results.get(0).getValue()),
|
||||
convertToNative((List<Uint160>) results.get(1).getValue()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>> positions(
|
||||
byte[] param0) {
|
||||
final Function function = new Function(FUNC_POSITIONS,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Bytes32(param0)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
|
||||
return new RemoteFunctionCall<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>(function,
|
||||
new Callable<Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>>() {
|
||||
@Override
|
||||
public Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger> call()
|
||||
throws Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple5<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger>(
|
||||
(BigInteger) results.get(0).getValue(),
|
||||
(BigInteger) results.get(1).getValue(),
|
||||
(BigInteger) results.get(2).getValue(),
|
||||
(BigInteger) results.get(3).getValue(),
|
||||
(BigInteger) results.get(4).getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple2<BigInteger, BigInteger>> protocolFees() {
|
||||
final Function function = new Function(FUNC_PROTOCOLFEES,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Uint128>() {}));
|
||||
return new RemoteFunctionCall<Tuple2<BigInteger, BigInteger>>(function,
|
||||
new Callable<Tuple2<BigInteger, BigInteger>>() {
|
||||
@Override
|
||||
public Tuple2<BigInteger, BigInteger> call() throws Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple2<BigInteger, BigInteger>(
|
||||
(BigInteger) results.get(0).getValue(),
|
||||
(BigInteger) results.get(1).getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> setFeeProtocol(BigInteger feeProtocol0,
|
||||
BigInteger feeProtocol1) {
|
||||
final Function function = new Function(
|
||||
FUNC_SETFEEPROTOCOL,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Uint8(feeProtocol0),
|
||||
new org.web3j.abi.datatypes.generated.Uint8(feeProtocol1)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> slot0(
|
||||
) {
|
||||
final Function function = new Function(FUNC_SLOT0,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint160>() {}, new TypeReference<Int24>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint16>() {}, new TypeReference<Uint8>() {}, new TypeReference<Bool>() {}));
|
||||
return new RemoteFunctionCall<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
|
||||
new Callable<Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
|
||||
@Override
|
||||
public Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
|
||||
) throws Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple7<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
|
||||
(BigInteger) results.get(0).getValue(),
|
||||
(BigInteger) results.get(1).getValue(),
|
||||
(BigInteger) results.get(2).getValue(),
|
||||
(BigInteger) results.get(3).getValue(),
|
||||
(BigInteger) results.get(4).getValue(),
|
||||
(BigInteger) results.get(5).getValue(),
|
||||
(Boolean) results.get(6).getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>> snapshotCumulativesInside(
|
||||
BigInteger tickLower, BigInteger tickUpper) {
|
||||
final Function function = new Function(FUNC_SNAPSHOTCUMULATIVESINSIDE,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(tickLower),
|
||||
new org.web3j.abi.datatypes.generated.Int24(tickUpper)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}));
|
||||
return new RemoteFunctionCall<Tuple3<BigInteger, BigInteger, BigInteger>>(function,
|
||||
new Callable<Tuple3<BigInteger, BigInteger, BigInteger>>() {
|
||||
@Override
|
||||
public Tuple3<BigInteger, BigInteger, BigInteger> call() throws Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple3<BigInteger, BigInteger, BigInteger>(
|
||||
(BigInteger) results.get(0).getValue(),
|
||||
(BigInteger) results.get(1).getValue(),
|
||||
(BigInteger) results.get(2).getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<TransactionReceipt> swap(String recipient, Boolean zeroForOne,
|
||||
BigInteger amountSpecified, BigInteger sqrtPriceLimitX96, byte[] data) {
|
||||
final Function function = new Function(
|
||||
FUNC_SWAP,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(160, recipient),
|
||||
new org.web3j.abi.datatypes.Bool(zeroForOne),
|
||||
new org.web3j.abi.datatypes.generated.Int256(amountSpecified),
|
||||
new org.web3j.abi.datatypes.generated.Uint160(sqrtPriceLimitX96),
|
||||
new org.web3j.abi.datatypes.DynamicBytes(data)),
|
||||
Collections.<TypeReference<?>>emptyList());
|
||||
return executeRemoteCallTransaction(function);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> tickBitmap(BigInteger param0) {
|
||||
final Function function = new Function(FUNC_TICKBITMAP,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int16(param0)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<BigInteger> tickSpacing() {
|
||||
final Function function = new Function(FUNC_TICKSPACING,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Int24>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>> ticks(
|
||||
BigInteger param0) {
|
||||
final Function function = new Function(FUNC_TICKS,
|
||||
Arrays.<Type>asList(new org.web3j.abi.datatypes.generated.Int24(param0)),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Uint128>() {}, new TypeReference<Int128>() {}, new TypeReference<Uint256>() {}, new TypeReference<Uint256>() {}, new TypeReference<Int56>() {}, new TypeReference<Uint160>() {}, new TypeReference<Uint32>() {}, new TypeReference<Bool>() {}));
|
||||
return new RemoteFunctionCall<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>(function,
|
||||
new Callable<Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>>() {
|
||||
@Override
|
||||
public Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean> call(
|
||||
) throws Exception {
|
||||
List<Type> results = executeCallMultipleValueReturn(function);
|
||||
return new Tuple8<BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, BigInteger, Boolean>(
|
||||
(BigInteger) results.get(0).getValue(),
|
||||
(BigInteger) results.get(1).getValue(),
|
||||
(BigInteger) results.get(2).getValue(),
|
||||
(BigInteger) results.get(3).getValue(),
|
||||
(BigInteger) results.get(4).getValue(),
|
||||
(BigInteger) results.get(5).getValue(),
|
||||
(BigInteger) results.get(6).getValue(),
|
||||
(Boolean) results.get(7).getValue());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<String> token0() {
|
||||
final Function function = new Function(FUNC_TOKEN0,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||
}
|
||||
|
||||
public RemoteFunctionCall<String> token1() {
|
||||
final Function function = new Function(FUNC_TOKEN1,
|
||||
Arrays.<Type>asList(),
|
||||
Arrays.<TypeReference<?>>asList(new TypeReference<Address>() {}));
|
||||
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
|
||||
BigInteger gasPrice, BigInteger gasLimit) {
|
||||
return new UniswapV3Pool(contractAddress, web3j, credentials, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
|
||||
TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
|
||||
return new UniswapV3Pool(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
|
||||
}
|
||||
|
||||
public static UniswapV3Pool load(String contractAddress, Web3j web3j, Credentials credentials,
|
||||
ContractGasProvider contractGasProvider) {
|
||||
return new UniswapV3Pool(contractAddress, web3j, credentials, contractGasProvider);
|
||||
}
|
||||
|
||||
public static UniswapV3Pool load(String contractAddress, Web3j web3j,
|
||||
TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
|
||||
return new UniswapV3Pool(contractAddress, web3j, transactionManager, contractGasProvider);
|
||||
}
|
||||
|
||||
public static class BurnEventResponse extends BaseEventResponse {
|
||||
public String owner;
|
||||
|
||||
public BigInteger tickLower;
|
||||
|
||||
public BigInteger tickUpper;
|
||||
|
||||
public BigInteger amount;
|
||||
|
||||
public BigInteger amount0;
|
||||
|
||||
public BigInteger amount1;
|
||||
}
|
||||
|
||||
public static class CollectEventResponse extends BaseEventResponse {
|
||||
public String owner;
|
||||
|
||||
public BigInteger tickLower;
|
||||
|
||||
public BigInteger tickUpper;
|
||||
|
||||
public String recipient;
|
||||
|
||||
public BigInteger amount0;
|
||||
|
||||
public BigInteger amount1;
|
||||
}
|
||||
|
||||
public static class CollectProtocolEventResponse extends BaseEventResponse {
|
||||
public String sender;
|
||||
|
||||
public String recipient;
|
||||
|
||||
public BigInteger amount0;
|
||||
|
||||
public BigInteger amount1;
|
||||
}
|
||||
|
||||
public static class FlashEventResponse extends BaseEventResponse {
|
||||
public String sender;
|
||||
|
||||
public String recipient;
|
||||
|
||||
public BigInteger amount0;
|
||||
|
||||
public BigInteger amount1;
|
||||
|
||||
public BigInteger paid0;
|
||||
|
||||
public BigInteger paid1;
|
||||
}
|
||||
|
||||
public static class IncreaseObservationCardinalityNextEventResponse extends BaseEventResponse {
|
||||
public BigInteger observationCardinalityNextOld;
|
||||
|
||||
public BigInteger observationCardinalityNextNew;
|
||||
}
|
||||
|
||||
public static class InitializeEventResponse extends BaseEventResponse {
|
||||
public BigInteger sqrtPriceX96;
|
||||
|
||||
public BigInteger tick;
|
||||
}
|
||||
|
||||
public static class MintEventResponse extends BaseEventResponse {
|
||||
public String owner;
|
||||
|
||||
public BigInteger tickLower;
|
||||
|
||||
public BigInteger tickUpper;
|
||||
|
||||
public String sender;
|
||||
|
||||
public BigInteger amount;
|
||||
|
||||
public BigInteger amount0;
|
||||
|
||||
public BigInteger amount1;
|
||||
}
|
||||
|
||||
public static class SetFeeProtocolEventResponse extends BaseEventResponse {
|
||||
public BigInteger feeProtocol0Old;
|
||||
|
||||
public BigInteger feeProtocol1Old;
|
||||
|
||||
public BigInteger feeProtocol0New;
|
||||
|
||||
public BigInteger feeProtocol1New;
|
||||
}
|
||||
|
||||
public static class SwapEventResponse extends BaseEventResponse {
|
||||
public String sender;
|
||||
|
||||
public String recipient;
|
||||
|
||||
public BigInteger amount0;
|
||||
|
||||
public BigInteger amount1;
|
||||
|
||||
public BigInteger sqrtPriceX96;
|
||||
|
||||
public BigInteger liquidity;
|
||||
|
||||
public BigInteger tick;
|
||||
}
|
||||
}
|
||||
@@ -2,4 +2,11 @@ package stream.dto;
|
||||
|
||||
public class AddressId extends ChainId {
|
||||
public String address;
|
||||
|
||||
public AddressId() {}
|
||||
|
||||
public AddressId(int chainId, String address) {
|
||||
super(chainId);
|
||||
this.address = address;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,4 +12,9 @@ public class BlockHash extends BlockId {
|
||||
public Object getId() {
|
||||
return this.hash;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.hash;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,4 +12,9 @@ public class BlockNumber extends BlockId {
|
||||
public Object getId() {
|
||||
return this.number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.valueOf(this.number);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,4 +4,10 @@ import java.io.Serializable;
|
||||
|
||||
public class ChainId implements Serializable {
|
||||
public int chainId;
|
||||
|
||||
public ChainId() {}
|
||||
|
||||
public ChainId(int chainId) {
|
||||
this.chainId = chainId;
|
||||
}
|
||||
}
|
||||
|
||||
42
src/main/java/stream/dto/ElaboratedSwapEvent.java
Normal file
42
src/main/java/stream/dto/ElaboratedSwapEvent.java
Normal file
@@ -0,0 +1,42 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class ElaboratedSwapEvent implements Serializable {
|
||||
private SwapEventLog swapEvent;
|
||||
private String token0Address;
|
||||
private String token1Address;
|
||||
|
||||
public ElaboratedSwapEvent() {
|
||||
}
|
||||
|
||||
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
||||
this.swapEvent = swapEvent;
|
||||
this.token0Address = token0Address;
|
||||
this.token1Address = token1Address;
|
||||
}
|
||||
|
||||
public SwapEventLog getSwapEvent() {
|
||||
return swapEvent;
|
||||
}
|
||||
|
||||
public void setSwapEvent(SwapEventLog swapEvent) {
|
||||
this.swapEvent = swapEvent;
|
||||
}
|
||||
|
||||
public String getToken0Address() {
|
||||
return token0Address;
|
||||
}
|
||||
|
||||
public void setToken0Address(String token0Address) {
|
||||
this.token0Address = token0Address;
|
||||
}
|
||||
|
||||
public String getToken1Address() {
|
||||
return token1Address;
|
||||
}
|
||||
|
||||
public void setToken1Address(String token1Address) {
|
||||
this.token1Address = token1Address;
|
||||
}
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class EnrichedSwapEventLog extends SwapEventLog implements Serializable {
|
||||
|
||||
// Token metadata for token0
|
||||
public Token token0Metadata;
|
||||
|
||||
// Token metadata for token1
|
||||
public Token token1Metadata;
|
||||
|
||||
public EnrichedSwapEventLog() {
|
||||
super();
|
||||
}
|
||||
|
||||
public static EnrichedSwapEventLog fromSwapEvent(SwapEventLog swapEvent) {
|
||||
EnrichedSwapEventLog enriched = new EnrichedSwapEventLog();
|
||||
|
||||
// Copy all base swap event fields
|
||||
enriched.address = swapEvent.address;
|
||||
enriched.sender = swapEvent.sender;
|
||||
enriched.recipient = swapEvent.recipient;
|
||||
enriched.amount0 = swapEvent.amount0;
|
||||
enriched.amount1 = swapEvent.amount1;
|
||||
enriched.sqrtPriceX96 = swapEvent.sqrtPriceX96;
|
||||
enriched.liquidity = swapEvent.liquidity;
|
||||
enriched.tick = swapEvent.tick;
|
||||
enriched.token0 = swapEvent.token0;
|
||||
enriched.token1 = swapEvent.token1;
|
||||
|
||||
// Copy EventLog fields
|
||||
enriched.blockNumber = swapEvent.blockNumber;
|
||||
enriched.transactionHash = swapEvent.transactionHash;
|
||||
|
||||
return enriched;
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
package stream.dto;
|
||||
|
||||
enum Exchange {
|
||||
public enum Exchange {
|
||||
UNISWAP_V2,
|
||||
UNISWAP_V3,
|
||||
UNISWAP_V4,
|
||||
|
||||
42
src/main/java/stream/dto/FullyElaboratedSwap.java
Normal file
42
src/main/java/stream/dto/FullyElaboratedSwap.java
Normal file
@@ -0,0 +1,42 @@
|
||||
package stream.dto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class FullyElaboratedSwap implements Serializable {
|
||||
private ElaboratedSwapEvent swapEvent;
|
||||
private Token token0;
|
||||
private Token token1;
|
||||
|
||||
public FullyElaboratedSwap() {
|
||||
}
|
||||
|
||||
public FullyElaboratedSwap(ElaboratedSwapEvent swapEvent, Token token0, Token token1) {
|
||||
this.swapEvent = swapEvent;
|
||||
this.token0 = token0;
|
||||
this.token1 = token1;
|
||||
}
|
||||
|
||||
public ElaboratedSwapEvent getSwapEvent() {
|
||||
return swapEvent;
|
||||
}
|
||||
|
||||
public void setSwapEvent(ElaboratedSwapEvent swapEvent) {
|
||||
this.swapEvent = swapEvent;
|
||||
}
|
||||
|
||||
public Token getToken0() {
|
||||
return token0;
|
||||
}
|
||||
|
||||
public void setToken0(Token token0) {
|
||||
this.token0 = token0;
|
||||
}
|
||||
|
||||
public Token getToken1() {
|
||||
return token1;
|
||||
}
|
||||
|
||||
public void setToken1(Token token1) {
|
||||
this.token1 = token1;
|
||||
}
|
||||
}
|
||||
@@ -15,4 +15,16 @@ public class Swap extends ChainId {
|
||||
String makerAsset; // token address
|
||||
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
|
||||
BigDecimal price;
|
||||
|
||||
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
|
||||
BigDecimal amount, BigDecimal price) {
|
||||
super(chainId);
|
||||
this.time = time;
|
||||
this.exchange = exchange;
|
||||
this.pool = pool;
|
||||
this.takerAsset = takerAsset;
|
||||
this.makerAsset = makerAsset;
|
||||
this.amount = amount;
|
||||
this.price = price;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,4 +38,8 @@ public class SwapEventLog extends EventLog implements Serializable {
|
||||
@BigInt
|
||||
public BigInteger liquidity;
|
||||
public int tick;
|
||||
|
||||
public String getAddress() {
|
||||
return this.address;
|
||||
}
|
||||
}
|
||||
@@ -6,31 +6,22 @@ public class Token extends AddressId {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private String name;
|
||||
private String symbol;
|
||||
private int decimals;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
public String name;
|
||||
public String symbol;
|
||||
public int decimals;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public Token() {}
|
||||
|
||||
public Token(int chainId, String address, String name, String symbol, int decimals) {
|
||||
super(chainId, address);
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getSymbol() {
|
||||
return symbol;
|
||||
}
|
||||
|
||||
public void setSymbol(String symbol) {
|
||||
this.symbol = symbol;
|
||||
}
|
||||
|
||||
public int getDecimals() {
|
||||
return decimals;
|
||||
}
|
||||
|
||||
public void setDecimals(int decimals) {
|
||||
this.decimals = decimals;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
|
||||
}
|
||||
}
|
||||
|
||||
52
src/main/java/stream/io/BlockElaborator.java
Normal file
52
src/main/java/stream/io/BlockElaborator.java
Normal file
@@ -0,0 +1,52 @@
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.core.DefaultBlockParameterNumber;
|
||||
import org.web3j.protocol.core.methods.response.EthBlock;
|
||||
import stream.dto.BlockHash;
|
||||
import stream.dto.BlockId;
|
||||
import stream.dto.BlockNumber;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
|
||||
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
|
||||
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
|
||||
private transient Web3j w3;
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||
w3 = Web3Client.get(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return getBlock(blockId);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to get block {} on chain {}", blockId, e);
|
||||
throw new RuntimeException("Error processing block " + blockId, e);
|
||||
}
|
||||
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
|
||||
}
|
||||
|
||||
private EthBlock getBlock(BlockId id) {
|
||||
try {
|
||||
return (id instanceof BlockNumber) ?
|
||||
w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() :
|
||||
w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
package stream.io;
|
||||
|
||||
import java.util.HexFormat;
|
||||
|
||||
import org.bouncycastle.jcajce.provider.digest.Keccak;
|
||||
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
|
||||
|
||||
|
||||
public class EthUtils {
|
||||
public static boolean isValidAddress(String address) {
|
||||
@@ -35,6 +37,6 @@ public class EthUtils {
|
||||
|
||||
public static String keccak(String message) {
|
||||
byte[] hash = new Keccak.Digest256().digest(message.getBytes());
|
||||
return "0x"+ByteUtils.toHexString(hash);
|
||||
return "0x" + HexFormat.of().formatHex(hash);
|
||||
}
|
||||
}
|
||||
|
||||
34
src/main/java/stream/io/JedisClient.java
Normal file
34
src/main/java/stream/io/JedisClient.java
Normal file
@@ -0,0 +1,34 @@
|
||||
package stream.io;
|
||||
|
||||
import redis.clients.jedis.ConnectionPoolConfig;
|
||||
import redis.clients.jedis.JedisPooled;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
|
||||
public class JedisClient {
|
||||
protected static volatile JedisPooled jedis = null;
|
||||
private static final Object jedisLock = new Object();
|
||||
|
||||
static public JedisPooled get(Map<String, String> params) {
|
||||
if (jedis == null) {
|
||||
synchronized (jedisLock) {
|
||||
String url = params.getOrDefault("redis_url", "http://localhost:6379");
|
||||
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
|
||||
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
|
||||
poolConfig.setMaxTotal(connections);
|
||||
poolConfig.setMaxIdle(connections);
|
||||
poolConfig.setMinIdle(0);
|
||||
poolConfig.setBlockWhenExhausted(true);
|
||||
poolConfig.setMaxWait(Duration.ofSeconds(5));
|
||||
// Enables sending a PING command periodically while the connection is idle.
|
||||
poolConfig.setTestWhileIdle(true);
|
||||
// controls the period between checks for idle connections in the pool
|
||||
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
|
||||
jedis = new JedisPooled(poolConfig, url);
|
||||
}
|
||||
}
|
||||
return jedis;
|
||||
}
|
||||
|
||||
}
|
||||
80
src/main/java/stream/io/PoolElaborator.java
Normal file
80
src/main/java/stream/io/PoolElaborator.java
Normal file
@@ -0,0 +1,80 @@
|
||||
package stream.io;
|
||||
|
||||
import stream.dto.SwapEventLog;
|
||||
import stream.dto.ElaboratedSwapEvent;
|
||||
import stream.contract.UniswapV3Pool;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.http.HttpService;
|
||||
import org.web3j.crypto.Credentials;
|
||||
import org.web3j.tx.gas.DefaultGasProvider;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.Collections;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.Map;
|
||||
|
||||
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> {
|
||||
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class);
|
||||
private transient Web3j web3j;
|
||||
private transient Credentials credentials;
|
||||
private transient DefaultGasProvider gasProvider;
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
|
||||
// Get RPC URL from job parameters
|
||||
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||
// TODO: Get from configuration if needed
|
||||
|
||||
// Initialize Web3j
|
||||
this.web3j = Web3j.build(new HttpService(rpcUrl));
|
||||
|
||||
// Dummy credentials for read-only operations
|
||||
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
|
||||
|
||||
// Default gas provider
|
||||
this.gasProvider = new DefaultGasProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception {
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
// Load the pool contract
|
||||
UniswapV3Pool pool = UniswapV3Pool.load(
|
||||
swap.getAddress(), // Pool address from the event
|
||||
web3j,
|
||||
credentials,
|
||||
gasProvider
|
||||
);
|
||||
|
||||
// Get token addresses
|
||||
String token0 = pool.token0().send();
|
||||
String token1 = pool.token1().send();
|
||||
|
||||
// Create enriched event
|
||||
return new ElaboratedSwapEvent(swap, token0, token1);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error fetching pool tokens", e);
|
||||
// Return original without enrichment
|
||||
return new ElaboratedSwapEvent(swap, null, null);
|
||||
}
|
||||
}).thenAccept(enriched -> {
|
||||
resultFuture.complete(Collections.singletonList(enriched));
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (web3j != null) {
|
||||
web3j.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
package stream.io;
|
||||
|
||||
public class PoolTokenFetcher {
|
||||
// Uniswap V3 pool method signatures
|
||||
private static final String TOKEN0_SIGNATURE = "0x0dfe1681"; // token0()
|
||||
private static final String TOKEN1_SIGNATURE = "0xd21220a7"; // token1()
|
||||
|
||||
private final JsonRpcClient jsonRpcClient;
|
||||
|
||||
public PoolTokenFetcher(JsonRpcClient jsonRpcClient) {
|
||||
this.jsonRpcClient = jsonRpcClient;
|
||||
}
|
||||
|
||||
public String fetchToken0(String poolAddress) throws Throwable {
|
||||
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN0_SIGNATURE);
|
||||
Object[] params = new Object[]{request, "latest"};
|
||||
String result = jsonRpcClient.invoke("eth_call", params, String.class);
|
||||
return decodeAddress(result);
|
||||
}
|
||||
|
||||
public String fetchToken1(String poolAddress) throws Throwable {
|
||||
EthCallRequest request = new EthCallRequest(poolAddress, TOKEN1_SIGNATURE);
|
||||
Object[] params = new Object[]{request, "latest"};
|
||||
String result = jsonRpcClient.invoke("eth_call", params, String.class);
|
||||
return decodeAddress(result);
|
||||
}
|
||||
|
||||
private String decodeAddress(String hexResult) {
|
||||
if (hexResult == null || hexResult.equals("0x") || hexResult.length() < 42) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Address is the last 20 bytes (40 hex chars) of the result
|
||||
// Remove 0x prefix and get last 40 characters, then add 0x back
|
||||
String hex = hexResult.substring(2);
|
||||
if (hex.length() >= 40) {
|
||||
return "0x" + hex.substring(hex.length() - 40);
|
||||
}
|
||||
|
||||
return hexResult; // Return as-is if unexpected format
|
||||
}
|
||||
|
||||
private static class EthCallRequest {
|
||||
public final String to;
|
||||
public final String data;
|
||||
|
||||
public EthCallRequest(String to, String data) {
|
||||
this.to = to;
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
}
|
||||
94
src/main/java/stream/io/SwapEnricher.java
Normal file
94
src/main/java/stream/io/SwapEnricher.java
Normal file
@@ -0,0 +1,94 @@
|
||||
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import stream.dto.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.math.MathContext;
|
||||
import java.math.RoundingMode;
|
||||
|
||||
public class SwapEnricher extends RichMapFunction<FullyElaboratedSwap, Swap> {
|
||||
private static final Logger log = LoggerFactory.getLogger(SwapEnricher.class);
|
||||
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
|
||||
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
|
||||
|
||||
@Override
|
||||
public Swap map(FullyElaboratedSwap fullSwap) throws Exception {
|
||||
ElaboratedSwapEvent event = fullSwap.getSwapEvent();
|
||||
SwapEventLog swapLog = event.getSwapEvent();
|
||||
Token token0 = fullSwap.getToken0();
|
||||
Token token1 = fullSwap.getToken1();
|
||||
|
||||
// For now, hardcode exchange as UNISWAP_V3 and time as current time
|
||||
Long time = System.currentTimeMillis();
|
||||
Exchange exchange = Exchange.UNISWAP_V3;
|
||||
|
||||
// Pool address
|
||||
String pool = swapLog.address;
|
||||
|
||||
// Get sqrtPriceX96 and calculate actual price
|
||||
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
|
||||
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
|
||||
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
|
||||
|
||||
// Adjust price for decimals (price is token1/token0)
|
||||
int decimalDiff = token0.decimals - token1.decimals;
|
||||
BigDecimal adjustedPrice;
|
||||
if (decimalDiff >= 0) {
|
||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
|
||||
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
|
||||
} else {
|
||||
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
||||
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||
}
|
||||
|
||||
// Determine which token is in and which is out based on amount signs
|
||||
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||
|
||||
String takerAsset;
|
||||
String makerAsset;
|
||||
BigDecimal amountIn;
|
||||
BigDecimal amountOut;
|
||||
BigDecimal finalPrice;
|
||||
|
||||
if (isToken0In) {
|
||||
// User is sending token0, receiving token1
|
||||
takerAsset = event.getToken0Address();
|
||||
makerAsset = event.getToken1Address();
|
||||
|
||||
// Convert amounts to human-readable format using decimals
|
||||
amountIn = new BigDecimal(swapLog.amount0.abs())
|
||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||
amountOut = new BigDecimal(swapLog.amount1)
|
||||
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
|
||||
|
||||
// Price is how much token1 you get per token0
|
||||
finalPrice = adjustedPrice;
|
||||
} else {
|
||||
// User is sending token1, receiving token0
|
||||
takerAsset = event.getToken1Address();
|
||||
makerAsset = event.getToken0Address();
|
||||
|
||||
// Convert amounts to human-readable format using decimals
|
||||
amountIn = new BigDecimal(swapLog.amount1.abs())
|
||||
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
|
||||
amountOut = new BigDecimal(swapLog.amount0)
|
||||
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||
|
||||
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
|
||||
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
|
||||
}
|
||||
|
||||
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
|
||||
pool, token0.symbol + "/" + token1.symbol,
|
||||
isToken0In ? token1.symbol : token0.symbol,
|
||||
amountIn, amountOut, finalPrice);
|
||||
|
||||
// Pass both amountIn and amountOut to constructor with unit price
|
||||
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
||||
}
|
||||
}
|
||||
@@ -1,161 +0,0 @@
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import stream.dto.EnrichedSwapEventLog;
|
||||
import stream.dto.SwapEventLog;
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class SwapEventEnricher extends RichAsyncFunction<SwapEventLog, EnrichedSwapEventLog> {
|
||||
|
||||
private transient JsonRpcClient jsonRpcClient;
|
||||
private transient PoolTokenFetcher poolTokenFetcher;
|
||||
private transient TokenMetadataFetcher tokenMetadataFetcher;
|
||||
private transient TokenCache tokenCache;
|
||||
private final String rpcUrl;
|
||||
|
||||
public SwapEventEnricher(String rpcUrl) {
|
||||
this.rpcUrl = rpcUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
|
||||
this.poolTokenFetcher = new PoolTokenFetcher(jsonRpcClient);
|
||||
this.tokenMetadataFetcher = new TokenMetadataFetcher(rpcUrl);
|
||||
this.tokenMetadataFetcher.open(openContext);
|
||||
this.tokenCache = new TokenCache();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<EnrichedSwapEventLog> resultFuture) throws Exception {
|
||||
try {
|
||||
// First, get token addresses if not already populated
|
||||
CompletableFuture<Void> tokenAddressFuture;
|
||||
|
||||
if (swapEvent.token0 == null || swapEvent.token1 == null) {
|
||||
tokenAddressFuture = CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
if (swapEvent.token0 == null) {
|
||||
swapEvent.token0 = poolTokenFetcher.fetchToken0(swapEvent.address);
|
||||
}
|
||||
if (swapEvent.token1 == null) {
|
||||
swapEvent.token1 = poolTokenFetcher.fetchToken1(swapEvent.address);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Error fetching token addresses: " + e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
tokenAddressFuture = CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
// Then fetch metadata for both tokens
|
||||
tokenAddressFuture.thenCompose(v -> {
|
||||
if (swapEvent.token0 == null || swapEvent.token1 == null) {
|
||||
return CompletableFuture.failedFuture(new RuntimeException("Failed to get token addresses"));
|
||||
}
|
||||
|
||||
// Fetch metadata for both tokens concurrently (with caching)
|
||||
CompletableFuture<Token> token0MetaFuture = tokenCache.getOrFetch(swapEvent.token0, tokenMetadataFetcher);
|
||||
CompletableFuture<Token> token1MetaFuture = tokenCache.getOrFetch(swapEvent.token1, tokenMetadataFetcher);
|
||||
|
||||
return CompletableFuture.allOf(token0MetaFuture, token1MetaFuture)
|
||||
.thenApply(ignored -> {
|
||||
try {
|
||||
Token token0Meta = token0MetaFuture.get();
|
||||
Token token1Meta = token1MetaFuture.get();
|
||||
|
||||
// Create enriched event with token metadata
|
||||
EnrichedSwapEventLog enriched = EnrichedSwapEventLog.fromSwapEvent(swapEvent);
|
||||
|
||||
// Set token metadata objects
|
||||
if (token0Meta != null) {
|
||||
enriched.token0Metadata = token0Meta;
|
||||
// Also set the address in the Token object
|
||||
token0Meta.address = swapEvent.token0;
|
||||
}
|
||||
|
||||
if (token1Meta != null) {
|
||||
enriched.token1Metadata = token1Meta;
|
||||
// Also set the address in the Token object
|
||||
token1Meta.address = swapEvent.token1;
|
||||
}
|
||||
|
||||
return enriched;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error creating enriched event: " + e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
})
|
||||
.thenAccept(enrichedEvent -> {
|
||||
resultFuture.complete(Collections.singleton(enrichedEvent));
|
||||
})
|
||||
.exceptionally(throwable -> {
|
||||
System.err.println("Error enriching swap event: " + throwable.getMessage());
|
||||
throwable.printStackTrace();
|
||||
resultFuture.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
|
||||
// Log cache stats periodically (every 100 events)
|
||||
if (System.currentTimeMillis() % 10000 < 100) {
|
||||
System.out.println("Token cache stats: " + tokenCache.getStats());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
resultFuture.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockResultFuture implements ResultFuture<Token> {
|
||||
private Token result;
|
||||
private Throwable error;
|
||||
private boolean complete = false;
|
||||
|
||||
@Override
|
||||
public void complete(java.util.Collection<Token> results) {
|
||||
if (!results.isEmpty()) {
|
||||
this.result = results.iterator().next();
|
||||
}
|
||||
this.complete = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
|
||||
try {
|
||||
java.util.Collection<Token> resultCollection = results.get();
|
||||
complete(resultCollection);
|
||||
} catch (Exception e) {
|
||||
completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionally(Throwable error) {
|
||||
this.error = error;
|
||||
this.complete = true;
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return complete;
|
||||
}
|
||||
|
||||
public boolean hasError() {
|
||||
return error != null;
|
||||
}
|
||||
|
||||
public Token getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public Throwable getError() {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,142 +0,0 @@
|
||||
package stream.io;
|
||||
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Thread-safe cache for token metadata to avoid repeated RPC calls
|
||||
* for the same token addresses.
|
||||
*/
|
||||
public class TokenCache {
|
||||
|
||||
private final ConcurrentHashMap<String, Token> cache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, CompletableFuture<Token>> pendingRequests = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Get token from cache if present, otherwise return null
|
||||
*/
|
||||
public Token get(String tokenAddress) {
|
||||
return cache.get(tokenAddress.toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Put token in cache
|
||||
*/
|
||||
public void put(String tokenAddress, Token token) {
|
||||
if (token != null && tokenAddress != null) {
|
||||
cache.put(tokenAddress.toLowerCase(), token);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if token is already in cache
|
||||
*/
|
||||
public boolean contains(String tokenAddress) {
|
||||
return cache.containsKey(tokenAddress.toLowerCase());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or fetch token metadata with deduplication.
|
||||
* If multiple requests come for the same token simultaneously,
|
||||
* only one will fetch and others will wait for the result.
|
||||
*/
|
||||
public CompletableFuture<Token> getOrFetch(String tokenAddress, TokenMetadataFetcher fetcher) {
|
||||
String normalizedAddress = tokenAddress.toLowerCase();
|
||||
|
||||
// Check cache first
|
||||
Token cachedToken = cache.get(normalizedAddress);
|
||||
if (cachedToken != null) {
|
||||
return CompletableFuture.completedFuture(cachedToken);
|
||||
}
|
||||
|
||||
// Check if there's already a pending request for this token
|
||||
CompletableFuture<Token> pendingFuture = pendingRequests.get(normalizedAddress);
|
||||
if (pendingFuture != null) {
|
||||
return pendingFuture;
|
||||
}
|
||||
|
||||
// Create new request
|
||||
CompletableFuture<Token> future = new CompletableFuture<>();
|
||||
|
||||
// Register the pending request
|
||||
CompletableFuture<Token> existingFuture = pendingRequests.putIfAbsent(normalizedAddress, future);
|
||||
if (existingFuture != null) {
|
||||
// Another thread beat us to it, return their future
|
||||
return existingFuture;
|
||||
}
|
||||
|
||||
// We're the first, so fetch the token metadata
|
||||
try {
|
||||
fetcher.asyncInvoke(tokenAddress, new org.apache.flink.streaming.api.functions.async.ResultFuture<Token>() {
|
||||
@Override
|
||||
public void complete(java.util.Collection<Token> results) {
|
||||
Token token = results.isEmpty() ? null : results.iterator().next();
|
||||
|
||||
// Cache the result (even if null to avoid repeated failures)
|
||||
if (token != null) {
|
||||
cache.put(normalizedAddress, token);
|
||||
}
|
||||
|
||||
// Complete the future
|
||||
future.complete(token);
|
||||
|
||||
// Remove from pending requests
|
||||
pendingRequests.remove(normalizedAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<Token> results) {
|
||||
try {
|
||||
java.util.Collection<Token> resultCollection = results.get();
|
||||
complete(resultCollection);
|
||||
} catch (Exception e) {
|
||||
completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionally(Throwable error) {
|
||||
future.completeExceptionally(error);
|
||||
pendingRequests.remove(normalizedAddress);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
pendingRequests.remove(normalizedAddress);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cache statistics for monitoring
|
||||
*/
|
||||
public CacheStats getStats() {
|
||||
return new CacheStats(cache.size(), pendingRequests.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all cached entries (useful for testing or memory management)
|
||||
*/
|
||||
public void clear() {
|
||||
cache.clear();
|
||||
// Note: We don't clear pending requests as they're in flight
|
||||
}
|
||||
|
||||
public static class CacheStats {
|
||||
public final int cachedTokens;
|
||||
public final int pendingRequests;
|
||||
|
||||
public CacheStats(int cachedTokens, int pendingRequests) {
|
||||
this.cachedTokens = cachedTokens;
|
||||
this.pendingRequests = pendingRequests;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("TokenCache[cached=%d, pending=%d]", cachedTokens, pendingRequests);
|
||||
}
|
||||
}
|
||||
}
|
||||
66
src/main/java/stream/io/TokenElaborator.java
Normal file
66
src/main/java/stream/io/TokenElaborator.java
Normal file
@@ -0,0 +1,66 @@
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.tx.ReadonlyTransactionManager;
|
||||
import org.web3j.tx.gas.DefaultGasProvider;
|
||||
import stream.contract.ERC20;
|
||||
import stream.dto.AddressId;
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
|
||||
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
|
||||
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
|
||||
private transient Web3j w3;
|
||||
private DefaultGasProvider gasProvider;
|
||||
private ReadonlyTransactionManager transactionManager;
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||
w3 = Web3Client.get(params);
|
||||
gasProvider = new DefaultGasProvider();
|
||||
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
|
||||
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return processToken(address);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
|
||||
throw new RuntimeException("Error processing token: " + address, e);
|
||||
}
|
||||
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
|
||||
}
|
||||
|
||||
private Token processToken(AddressId address) {
|
||||
String name;
|
||||
String symbol;
|
||||
int decimals;
|
||||
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
|
||||
try {
|
||||
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
|
||||
CompletableFuture<String> nameAsync = contract.name().sendAsync();
|
||||
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
|
||||
decimals = decimalsAsync.get().intValue();
|
||||
name = nameAsync.get();
|
||||
symbol = symbolAsync.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return new Token(address.chainId, address.address, name, symbol, decimals);
|
||||
}
|
||||
}
|
||||
@@ -1,166 +0,0 @@
|
||||
package stream.io;
|
||||
|
||||
import org.apache.flink.api.common.functions.OpenContext;
|
||||
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||
import stream.dto.Token;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class TokenMetadataFetcher extends RichAsyncFunction<String, Token> {
|
||||
|
||||
// ERC-20 method signatures
|
||||
private static final String NAME_SIGNATURE = "0x06fdde03";
|
||||
private static final String SYMBOL_SIGNATURE = "0x95d89b41";
|
||||
private static final String DECIMALS_SIGNATURE = "0x313ce567";
|
||||
private static final String LATEST_BLOCK = "latest";
|
||||
|
||||
private transient JsonRpcClient jsonRpcClient;
|
||||
private final String rpcUrl;
|
||||
|
||||
public TokenMetadataFetcher(String rpcUrl) {
|
||||
this.rpcUrl = rpcUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(OpenContext openContext) throws Exception {
|
||||
super.open(openContext);
|
||||
this.jsonRpcClient = new JsonRpcClient(rpcUrl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncInvoke(String tokenAddress, ResultFuture<Token> resultFuture) throws Exception {
|
||||
try {
|
||||
// Fetch all metadata concurrently
|
||||
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return fetchName(tokenAddress);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
CompletableFuture<String> symbolFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return fetchSymbol(tokenAddress);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
CompletableFuture<String> decimalsFuture = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return fetchDecimals(tokenAddress);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
// Combine all results
|
||||
CompletableFuture.allOf(nameFuture, symbolFuture, decimalsFuture)
|
||||
.thenAccept(v -> {
|
||||
try {
|
||||
Token token = new Token();
|
||||
token.setName(decodeString(nameFuture.get()));
|
||||
token.setSymbol(decodeString(symbolFuture.get()));
|
||||
token.setDecimals(decodeUint8(decimalsFuture.get()));
|
||||
|
||||
resultFuture.complete(Collections.singleton(token));
|
||||
} catch (Exception e) {
|
||||
resultFuture.completeExceptionally(e);
|
||||
}
|
||||
})
|
||||
.exceptionally(throwable -> {
|
||||
resultFuture.completeExceptionally(throwable);
|
||||
return null;
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
resultFuture.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String fetchName(String tokenAddress) throws Throwable {
|
||||
return makeEthCall(tokenAddress, NAME_SIGNATURE);
|
||||
}
|
||||
|
||||
private String fetchSymbol(String tokenAddress) throws Throwable {
|
||||
return makeEthCall(tokenAddress, SYMBOL_SIGNATURE);
|
||||
}
|
||||
|
||||
private String fetchDecimals(String tokenAddress) throws Throwable {
|
||||
return makeEthCall(tokenAddress, DECIMALS_SIGNATURE);
|
||||
}
|
||||
|
||||
private String makeEthCall(String toAddress, String data) throws Throwable {
|
||||
EthCallRequest request = new EthCallRequest(toAddress, data);
|
||||
Object[] params = new Object[]{request, LATEST_BLOCK};
|
||||
return jsonRpcClient.invoke("eth_call", params, String.class);
|
||||
}
|
||||
|
||||
private String decodeString(String hexData) {
|
||||
if (hexData == null || hexData.equals("0x") || hexData.length() < 66) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
try {
|
||||
// Remove 0x prefix
|
||||
String hex = hexData.substring(2);
|
||||
|
||||
// Skip the first 32 bytes (offset pointer)
|
||||
// The actual string length is at bytes 32-64
|
||||
String lengthHex = hex.substring(64, 128);
|
||||
int length = Integer.parseInt(lengthHex, 16);
|
||||
|
||||
if (length <= 0 || length > 100) { // Sanity check
|
||||
return "Invalid";
|
||||
}
|
||||
|
||||
// Extract the actual string data starting at byte 64
|
||||
String dataHex = hex.substring(128, 128 + (length * 2));
|
||||
|
||||
// Convert hex to string
|
||||
StringBuilder result = new StringBuilder();
|
||||
for (int i = 0; i < dataHex.length(); i += 2) {
|
||||
String hexChar = dataHex.substring(i, i + 2);
|
||||
int charCode = Integer.parseInt(hexChar, 16);
|
||||
if (charCode > 0) { // Skip null bytes
|
||||
result.append((char) charCode);
|
||||
}
|
||||
}
|
||||
|
||||
return result.toString();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error decoding string from hex: " + hexData + " - " + e.getMessage());
|
||||
return "DecodeError";
|
||||
}
|
||||
}
|
||||
|
||||
private int decodeUint8(String hexData) {
|
||||
if (hexData == null || hexData.equals("0x")) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
// Remove 0x prefix and get last byte (uint8)
|
||||
String hex = hexData.substring(2);
|
||||
// uint8 is the last 2 hex characters (1 byte)
|
||||
String uint8Hex = hex.substring(hex.length() - 2);
|
||||
return Integer.parseInt(uint8Hex, 16);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error decoding uint8 from hex: " + hexData + " - " + e.getMessage());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static class EthCallRequest {
|
||||
public final String to;
|
||||
public final String data;
|
||||
|
||||
public EthCallRequest(String to, String data) {
|
||||
this.to = to;
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
}
|
||||
45
src/main/java/stream/io/Web3Client.java
Normal file
45
src/main/java/stream/io/Web3Client.java
Normal file
@@ -0,0 +1,45 @@
|
||||
package stream.io;
|
||||
|
||||
import okhttp3.ConnectionPool;
|
||||
import okhttp3.OkHttpClient;
|
||||
import org.web3j.protocol.Web3j;
|
||||
import org.web3j.protocol.http.HttpService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static okhttp3.ConnectionSpec.CLEARTEXT;
|
||||
import static okhttp3.ConnectionSpec.MODERN_TLS;
|
||||
|
||||
public class Web3Client {
|
||||
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
|
||||
private static volatile Web3j w3 = null;
|
||||
private static final Object w3Lock = new Object();
|
||||
|
||||
static public Web3j get(Map<String, String> params) {
|
||||
if (w3==null) {
|
||||
synchronized (w3Lock) {
|
||||
log.info("Initializing Web3 client");
|
||||
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
|
||||
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
|
||||
Duration timeout = Duration.ofSeconds(30);
|
||||
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||
url, maxIdleConnections, keepAlive, timeout.getSeconds());
|
||||
var httpClient = new OkHttpClient.Builder()
|
||||
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
|
||||
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
|
||||
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
|
||||
.build();
|
||||
w3 = Web3j.build(new HttpService(url, httpClient));
|
||||
log.info("Web3 client initialized successfully");
|
||||
}
|
||||
}
|
||||
return w3;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user