TokenElaborator
This commit is contained in:
31
pom.xml
31
pom.xml
@@ -35,9 +35,11 @@ under the License.
|
|||||||
<scala.binary.version>2.12</scala.binary.version>
|
<scala.binary.version>2.12</scala.binary.version>
|
||||||
<flink.version>2.1.0</flink.version>
|
<flink.version>2.1.0</flink.version>
|
||||||
<log4j.version>2.24.1</log4j.version>
|
<log4j.version>2.24.1</log4j.version>
|
||||||
<web3j.version>5.0.0</web3j.version>
|
<web3j.version>5.0.1</web3j.version>
|
||||||
<httpclient5.version>5.3.1</httpclient5.version>
|
<httpclient5.version>5.3.1</httpclient5.version>
|
||||||
<jsonrpc4j.version>1.6</jsonrpc4j.version>
|
<jsonrpc4j.version>1.6</jsonrpc4j.version>
|
||||||
|
<jedis.version>6.1.0</jedis.version>
|
||||||
|
<jackson.version>2.17.1</jackson.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
@@ -55,6 +57,22 @@ under the License.
|
|||||||
</repositories>
|
</repositories>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-core</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-annotations</artifactId>
|
||||||
|
<version>${jackson.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Apache Flink dependencies -->
|
<!-- Apache Flink dependencies -->
|
||||||
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
|
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
|
||||||
<dependency>
|
<dependency>
|
||||||
@@ -132,6 +150,13 @@ under the License.
|
|||||||
<version>${log4j.version}</version>
|
<version>${log4j.version}</version>
|
||||||
<scope>runtime</scope>
|
<scope>runtime</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>redis.clients</groupId>
|
||||||
|
<artifactId>jedis</artifactId>
|
||||||
|
<version>${jedis.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
@@ -143,8 +168,8 @@ under the License.
|
|||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
<version>3.1</version>
|
<version>3.1</version>
|
||||||
<configuration>
|
<configuration>
|
||||||
<source>16</source>
|
<source>${target.java.version}</source>
|
||||||
<target>16</target>
|
<target>${target.java.version}</target>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
||||||
|
|||||||
@@ -19,17 +19,24 @@
|
|||||||
package stream;
|
package stream;
|
||||||
|
|
||||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
|
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.util.ParameterTool;
|
import org.apache.flink.util.ParameterTool;
|
||||||
|
import stream.contract.ArbitrumOne;
|
||||||
import stream.dto.*;
|
import stream.dto.*;
|
||||||
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import stream.io.TokenElaborator;
|
||||||
import stream.source.eventlog.EventLogSourceFactory;
|
import stream.source.eventlog.EventLogSourceFactory;
|
||||||
import stream.source.newheads.NewHeadsSourceFactory;
|
import stream.source.newheads.NewHeadsSourceFactory;
|
||||||
|
import stream.dto.AddressId;
|
||||||
|
import stream.dto.Token;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -57,6 +64,7 @@ public class DataStreamJob {
|
|||||||
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
|
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
|
||||||
|
|
||||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
env.getConfig().setGlobalJobParameters(parameters);
|
||||||
// do not do this until considering how secrets are handled by flink
|
// do not do this until considering how secrets are handled by flink
|
||||||
// env.getConfig().setGlobalJobParameters(parameters);
|
// env.getConfig().setGlobalJobParameters(parameters);
|
||||||
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
|
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
|
||||||
@@ -66,6 +74,14 @@ public class DataStreamJob {
|
|||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||||
|
|
||||||
|
// Test token metadata markup
|
||||||
|
// https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/operators/asyncio/
|
||||||
|
DataStream<AddressId> usdcStream = env.fromData(new AddressId(42161, ArbitrumOne.ADDR_USDC));
|
||||||
|
SingleOutputStreamOperator<Token> elaboratedUsdcStream =
|
||||||
|
AsyncDataStream.unorderedWait(usdcStream, new TokenElaborator(), 10, TimeUnit.MINUTES);
|
||||||
|
// debug print
|
||||||
|
elaboratedUsdcStream.map(token -> {log.info("Token: {}",token); return token.toString();}).print();
|
||||||
|
|
||||||
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
||||||
.fromSource(
|
.fromSource(
|
||||||
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
|
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
|
||||||
|
|||||||
7
src/main/java/stream/contract/ArbitrumOne.java
Normal file
7
src/main/java/stream/contract/ArbitrumOne.java
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package stream.contract;
|
||||||
|
|
||||||
|
public class ArbitrumOne {
|
||||||
|
static public final int CHAIN_ID = 42161;
|
||||||
|
|
||||||
|
static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831";
|
||||||
|
}
|
||||||
263
src/main/java/stream/contract/ERC20.java
Normal file
263
src/main/java/stream/contract/ERC20.java
Normal file
@@ -0,0 +1,263 @@
|
|||||||
|
package stream.contract;
|
||||||
|
|
||||||
|
import io.reactivex.Flowable;
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import org.web3j.abi.EventEncoder;
|
||||||
|
import org.web3j.abi.TypeReference;
|
||||||
|
import org.web3j.abi.datatypes.Address;
|
||||||
|
import org.web3j.abi.datatypes.Event;
|
||||||
|
import org.web3j.abi.datatypes.Function;
|
||||||
|
import org.web3j.abi.datatypes.Type;
|
||||||
|
import org.web3j.abi.datatypes.Utf8String;
|
||||||
|
import org.web3j.abi.datatypes.generated.Uint256;
|
||||||
|
import org.web3j.abi.datatypes.generated.Uint8;
|
||||||
|
import org.web3j.crypto.Credentials;
|
||||||
|
import org.web3j.protocol.Web3j;
|
||||||
|
import org.web3j.protocol.core.DefaultBlockParameter;
|
||||||
|
import org.web3j.protocol.core.RemoteCall;
|
||||||
|
import org.web3j.protocol.core.methods.request.EthFilter;
|
||||||
|
import org.web3j.protocol.core.methods.response.Log;
|
||||||
|
import org.web3j.protocol.core.methods.response.TransactionReceipt;
|
||||||
|
import org.web3j.tx.Contract;
|
||||||
|
import org.web3j.tx.TransactionManager;
|
||||||
|
import org.web3j.tx.gas.ContractGasProvider;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Auto generated code.
|
||||||
|
* <p><strong>Do not modify!</strong>
|
||||||
|
* <p>Please use the <a href="https://docs.web3j.io/command_line.html">web3j command line tools</a>,
|
||||||
|
* or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the
|
||||||
|
* <a href="https://github.com/hyperledger/web3j/tree/main/codegen">codegen module</a> to update.
|
||||||
|
*
|
||||||
|
* <p>Generated with web3j version 4.1.1.
|
||||||
|
*/
|
||||||
|
public class ERC20 extends Contract {
|
||||||
|
private static final String BINARY = "Bin file was not provided";
|
||||||
|
|
||||||
|
public static final String FUNC_NAME = "name";
|
||||||
|
|
||||||
|
public static final String FUNC_APPROVE = "approve";
|
||||||
|
|
||||||
|
public static final String FUNC_TOTALSUPPLY = "totalSupply";
|
||||||
|
|
||||||
|
public static final String FUNC_TRANSFERFROM = "transferFrom";
|
||||||
|
|
||||||
|
public static final String FUNC_DECIMALS = "decimals";
|
||||||
|
|
||||||
|
public static final String FUNC_BALANCEOF = "balanceOf";
|
||||||
|
|
||||||
|
public static final String FUNC_SYMBOL = "symbol";
|
||||||
|
|
||||||
|
public static final String FUNC_TRANSFER = "transfer";
|
||||||
|
|
||||||
|
public static final String FUNC_ALLOWANCE = "allowance";
|
||||||
|
|
||||||
|
public static final Event TRANSFER_EVENT = new Event("Transfer",
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
|
||||||
|
;
|
||||||
|
|
||||||
|
public static final Event APPROVAL_EVENT = new Event("Approval",
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Address>(true) {}, new TypeReference<Address>(true) {}, new TypeReference<Uint256>() {}));
|
||||||
|
;
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
|
||||||
|
super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
|
||||||
|
super(BINARY, contractAddress, web3j, credentials, contractGasProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
|
||||||
|
super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
|
||||||
|
super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<String> name() {
|
||||||
|
final Function function = new Function(FUNC_NAME,
|
||||||
|
Arrays.<Type>asList(),
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
|
||||||
|
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<TransactionReceipt> approve(String _spender, BigInteger _value) {
|
||||||
|
final Function function = new Function(
|
||||||
|
FUNC_APPROVE,
|
||||||
|
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_spender),
|
||||||
|
new org.web3j.abi.datatypes.generated.Uint256(_value)),
|
||||||
|
Collections.<TypeReference<?>>emptyList());
|
||||||
|
return executeRemoteCallTransaction(function);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<BigInteger> totalSupply() {
|
||||||
|
final Function function = new Function(FUNC_TOTALSUPPLY,
|
||||||
|
Arrays.<Type>asList(),
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||||
|
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<TransactionReceipt> transferFrom(String _from, String _to, BigInteger _value) {
|
||||||
|
final Function function = new Function(
|
||||||
|
FUNC_TRANSFERFROM,
|
||||||
|
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_from),
|
||||||
|
new org.web3j.abi.datatypes.Address(_to),
|
||||||
|
new org.web3j.abi.datatypes.generated.Uint256(_value)),
|
||||||
|
Collections.<TypeReference<?>>emptyList());
|
||||||
|
return executeRemoteCallTransaction(function);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<BigInteger> decimals() {
|
||||||
|
final Function function = new Function(FUNC_DECIMALS,
|
||||||
|
Arrays.<Type>asList(),
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Uint8>() {}));
|
||||||
|
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<BigInteger> balanceOf(String _owner) {
|
||||||
|
final Function function = new Function(FUNC_BALANCEOF,
|
||||||
|
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner)),
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||||
|
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<String> symbol() {
|
||||||
|
final Function function = new Function(FUNC_SYMBOL,
|
||||||
|
Arrays.<Type>asList(),
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Utf8String>() {}));
|
||||||
|
return executeRemoteCallSingleValueReturn(function, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<TransactionReceipt> transfer(String _to, BigInteger _value) {
|
||||||
|
final Function function = new Function(
|
||||||
|
FUNC_TRANSFER,
|
||||||
|
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_to),
|
||||||
|
new org.web3j.abi.datatypes.generated.Uint256(_value)),
|
||||||
|
Collections.<TypeReference<?>>emptyList());
|
||||||
|
return executeRemoteCallTransaction(function);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RemoteCall<BigInteger> allowance(String _owner, String _spender) {
|
||||||
|
final Function function = new Function(FUNC_ALLOWANCE,
|
||||||
|
Arrays.<Type>asList(new org.web3j.abi.datatypes.Address(_owner),
|
||||||
|
new org.web3j.abi.datatypes.Address(_spender)),
|
||||||
|
Arrays.<TypeReference<?>>asList(new TypeReference<Uint256>() {}));
|
||||||
|
return executeRemoteCallSingleValueReturn(function, BigInteger.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<TransferEventResponse> getTransferEvents(TransactionReceipt transactionReceipt) {
|
||||||
|
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt);
|
||||||
|
ArrayList<TransferEventResponse> responses = new ArrayList<TransferEventResponse>(valueList.size());
|
||||||
|
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||||
|
TransferEventResponse typedResponse = new TransferEventResponse();
|
||||||
|
typedResponse.log = eventValues.getLog();
|
||||||
|
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||||
|
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||||
|
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||||
|
responses.add(typedResponse);
|
||||||
|
}
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Flowable<TransferEventResponse> transferEventFlowable(EthFilter filter) {
|
||||||
|
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, TransferEventResponse>() {
|
||||||
|
@Override
|
||||||
|
public TransferEventResponse apply(Log log) {
|
||||||
|
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log);
|
||||||
|
TransferEventResponse typedResponse = new TransferEventResponse();
|
||||||
|
typedResponse.log = log;
|
||||||
|
typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||||
|
typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||||
|
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||||
|
return typedResponse;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public Flowable<TransferEventResponse> transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||||
|
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||||
|
filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT));
|
||||||
|
return transferEventFlowable(filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ApprovalEventResponse> getApprovalEvents(TransactionReceipt transactionReceipt) {
|
||||||
|
List<Contract.EventValuesWithLog> valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt);
|
||||||
|
ArrayList<ApprovalEventResponse> responses = new ArrayList<ApprovalEventResponse>(valueList.size());
|
||||||
|
for (Contract.EventValuesWithLog eventValues : valueList) {
|
||||||
|
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
|
||||||
|
typedResponse.log = eventValues.getLog();
|
||||||
|
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||||
|
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||||
|
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||||
|
responses.add(typedResponse);
|
||||||
|
}
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Flowable<ApprovalEventResponse> approvalEventFlowable(EthFilter filter) {
|
||||||
|
return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function<Log, ApprovalEventResponse>() {
|
||||||
|
@Override
|
||||||
|
public ApprovalEventResponse apply(Log log) {
|
||||||
|
Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log);
|
||||||
|
ApprovalEventResponse typedResponse = new ApprovalEventResponse();
|
||||||
|
typedResponse.log = log;
|
||||||
|
typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue();
|
||||||
|
typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue();
|
||||||
|
typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue();
|
||||||
|
return typedResponse;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public Flowable<ApprovalEventResponse> approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
|
||||||
|
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
|
||||||
|
filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT));
|
||||||
|
return approvalEventFlowable(filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) {
|
||||||
|
return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) {
|
||||||
|
return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) {
|
||||||
|
return new ERC20(contractAddress, web3j, credentials, contractGasProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) {
|
||||||
|
return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TransferEventResponse {
|
||||||
|
public Log log;
|
||||||
|
|
||||||
|
public String _from;
|
||||||
|
|
||||||
|
public String _to;
|
||||||
|
|
||||||
|
public BigInteger _value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ApprovalEventResponse {
|
||||||
|
public Log log;
|
||||||
|
|
||||||
|
public String _owner;
|
||||||
|
|
||||||
|
public String _spender;
|
||||||
|
|
||||||
|
public BigInteger _value;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,4 +2,11 @@ package stream.dto;
|
|||||||
|
|
||||||
public class AddressId extends ChainId {
|
public class AddressId extends ChainId {
|
||||||
public String address;
|
public String address;
|
||||||
|
|
||||||
|
public AddressId() {}
|
||||||
|
|
||||||
|
public AddressId(int chainId, String address) {
|
||||||
|
super(chainId);
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,4 +4,10 @@ import java.io.Serializable;
|
|||||||
|
|
||||||
public class ChainId implements Serializable {
|
public class ChainId implements Serializable {
|
||||||
public int chainId;
|
public int chainId;
|
||||||
|
|
||||||
|
public ChainId() {}
|
||||||
|
|
||||||
|
public ChainId(int chainId) {
|
||||||
|
this.chainId = chainId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package stream.dto;
|
package stream.dto;
|
||||||
|
|
||||||
enum Exchange {
|
public enum Exchange {
|
||||||
UNISWAP_V2,
|
UNISWAP_V2,
|
||||||
UNISWAP_V3,
|
UNISWAP_V3,
|
||||||
UNISWAP_V4,
|
UNISWAP_V4,
|
||||||
|
|||||||
@@ -15,4 +15,16 @@ public class Swap extends ChainId {
|
|||||||
String makerAsset; // token address
|
String makerAsset; // token address
|
||||||
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
|
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
|
||||||
BigDecimal price;
|
BigDecimal price;
|
||||||
|
|
||||||
|
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
|
||||||
|
BigDecimal amount, BigDecimal price) {
|
||||||
|
super(chainId);
|
||||||
|
this.time = time;
|
||||||
|
this.exchange = exchange;
|
||||||
|
this.pool = pool;
|
||||||
|
this.takerAsset = takerAsset;
|
||||||
|
this.makerAsset = makerAsset;
|
||||||
|
this.amount = amount;
|
||||||
|
this.price = price;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,22 @@ public class Token extends AddressId {
|
|||||||
@Serial
|
@Serial
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
String name;
|
public String name;
|
||||||
String symbol;
|
public String symbol;
|
||||||
int decimals;
|
public int decimals;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public Token() {}
|
||||||
|
|
||||||
|
public Token(int chainId, String address, String name, String symbol, int decimals) {
|
||||||
|
super(chainId, address);
|
||||||
|
this.name = name;
|
||||||
|
this.symbol = symbol;
|
||||||
|
this.decimals = decimals;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
package stream.io;
|
package stream.io;
|
||||||
|
|
||||||
|
import java.util.HexFormat;
|
||||||
|
|
||||||
import org.bouncycastle.jcajce.provider.digest.Keccak;
|
import org.bouncycastle.jcajce.provider.digest.Keccak;
|
||||||
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
|
|
||||||
|
|
||||||
public class EthUtils {
|
public class EthUtils {
|
||||||
public static boolean isValidAddress(String address) {
|
public static boolean isValidAddress(String address) {
|
||||||
@@ -35,6 +37,6 @@ public class EthUtils {
|
|||||||
|
|
||||||
public static String keccak(String message) {
|
public static String keccak(String message) {
|
||||||
byte[] hash = new Keccak.Digest256().digest(message.getBytes());
|
byte[] hash = new Keccak.Digest256().digest(message.getBytes());
|
||||||
return "0x"+ByteUtils.toHexString(hash);
|
return "0x" + HexFormat.of().formatHex(hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
34
src/main/java/stream/io/JedisClient.java
Normal file
34
src/main/java/stream/io/JedisClient.java
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import redis.clients.jedis.ConnectionPoolConfig;
|
||||||
|
import redis.clients.jedis.JedisPooled;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class JedisClient {
|
||||||
|
protected static volatile JedisPooled jedis = null;
|
||||||
|
private static final Object jedisLock = new Object();
|
||||||
|
|
||||||
|
static public JedisPooled get(Map<String, String> params) {
|
||||||
|
if (jedis == null) {
|
||||||
|
synchronized (jedisLock) {
|
||||||
|
String url = params.getOrDefault("redis_url", "http://localhost:6379");
|
||||||
|
int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8"));
|
||||||
|
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
|
||||||
|
poolConfig.setMaxTotal(connections);
|
||||||
|
poolConfig.setMaxIdle(connections);
|
||||||
|
poolConfig.setMinIdle(0);
|
||||||
|
poolConfig.setBlockWhenExhausted(true);
|
||||||
|
poolConfig.setMaxWait(Duration.ofSeconds(5));
|
||||||
|
// Enables sending a PING command periodically while the connection is idle.
|
||||||
|
poolConfig.setTestWhileIdle(true);
|
||||||
|
// controls the period between checks for idle connections in the pool
|
||||||
|
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10));
|
||||||
|
jedis = new JedisPooled(poolConfig, url);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return jedis;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
66
src/main/java/stream/io/TokenElaborator.java
Normal file
66
src/main/java/stream/io/TokenElaborator.java
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.web3j.protocol.Web3j;
|
||||||
|
import org.web3j.tx.ReadonlyTransactionManager;
|
||||||
|
import org.web3j.tx.gas.DefaultGasProvider;
|
||||||
|
import stream.contract.ERC20;
|
||||||
|
import stream.dto.AddressId;
|
||||||
|
import stream.dto.Token;
|
||||||
|
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
|
||||||
|
public class TokenElaborator extends RichAsyncFunction<AddressId, Token> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class);
|
||||||
|
private transient Web3j w3;
|
||||||
|
private DefaultGasProvider gasProvider;
|
||||||
|
private ReadonlyTransactionManager transactionManager;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||||
|
w3 = Web3Client.get(params);
|
||||||
|
gasProvider = new DefaultGasProvider();
|
||||||
|
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(AddressId address, final ResultFuture<Token> resultFuture) {
|
||||||
|
log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId);
|
||||||
|
CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
return processToken(address);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e);
|
||||||
|
throw new RuntimeException("Error processing token: " + address, e);
|
||||||
|
}
|
||||||
|
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Token processToken(AddressId address) {
|
||||||
|
String name;
|
||||||
|
String symbol;
|
||||||
|
int decimals;
|
||||||
|
ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider);
|
||||||
|
try {
|
||||||
|
CompletableFuture<BigInteger> decimalsAsync = contract.decimals().sendAsync();
|
||||||
|
CompletableFuture<String> nameAsync = contract.name().sendAsync();
|
||||||
|
CompletableFuture<String> symbolAsync = contract.symbol().sendAsync();
|
||||||
|
decimals = decimalsAsync.get().intValue();
|
||||||
|
name = nameAsync.get();
|
||||||
|
symbol = symbolAsync.get();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return new Token(address.chainId, address.address, name, symbol, decimals);
|
||||||
|
}
|
||||||
|
}
|
||||||
45
src/main/java/stream/io/Web3Client.java
Normal file
45
src/main/java/stream/io/Web3Client.java
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import okhttp3.ConnectionPool;
|
||||||
|
import okhttp3.OkHttpClient;
|
||||||
|
import org.web3j.protocol.Web3j;
|
||||||
|
import org.web3j.protocol.http.HttpService;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static okhttp3.ConnectionSpec.CLEARTEXT;
|
||||||
|
import static okhttp3.ConnectionSpec.MODERN_TLS;
|
||||||
|
|
||||||
|
public class Web3Client {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(Web3Client.class);
|
||||||
|
private static volatile Web3j w3 = null;
|
||||||
|
private static final Object w3Lock = new Object();
|
||||||
|
|
||||||
|
static public Web3j get(Map<String, String> params) {
|
||||||
|
if (w3==null) {
|
||||||
|
synchronized (w3Lock) {
|
||||||
|
log.info("Initializing Web3 client");
|
||||||
|
String url = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||||
|
int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5"));
|
||||||
|
int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60"));
|
||||||
|
Duration timeout = Duration.ofSeconds(30);
|
||||||
|
log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds",
|
||||||
|
url, maxIdleConnections, keepAlive, timeout.getSeconds());
|
||||||
|
var httpClient = new OkHttpClient.Builder()
|
||||||
|
.connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT))
|
||||||
|
.connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout)
|
||||||
|
.connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES))
|
||||||
|
.build();
|
||||||
|
w3 = Web3j.build(new HttpService(url, httpClient));
|
||||||
|
log.info("Web3 client initialized successfully");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return w3;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user