diff --git a/pom.xml b/pom.xml index 8efea7c..19b412e 100644 --- a/pom.xml +++ b/pom.xml @@ -35,9 +35,11 @@ under the License. 2.12 2.1.0 2.24.1 - 5.0.0 + 5.0.1 5.3.1 1.6 + 6.1.0 + 2.17.1 @@ -55,7 +57,23 @@ under the License. - + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + org.apache.flink @@ -132,7 +150,14 @@ under the License. ${log4j.version} runtime - + + + redis.clients + jedis + ${jedis.version} + + + @@ -143,8 +168,8 @@ under the License. maven-compiler-plugin 3.1 - 16 - 16 + ${target.java.version} + ${target.java.version} diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index 7b5767d..7687846 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -19,17 +19,24 @@ package stream; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.ParameterTool; +import stream.contract.ArbitrumOne; import stream.dto.*; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import stream.io.TokenElaborator; import stream.source.eventlog.EventLogSourceFactory; import stream.source.newheads.NewHeadsSourceFactory; +import stream.dto.AddressId; +import stream.dto.Token; import java.io.IOException; import java.net.URI; import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +64,7 @@ public class DataStreamJob { .mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setGlobalJobParameters(parameters); // do not do this until considering how secrets are handled by flink // env.getConfig().setGlobalJobParameters(parameters); URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545")); @@ -66,6 +74,14 @@ public class DataStreamJob { ObjectMapper mapper = new ObjectMapper(); mapper.enable(SerializationFeature.INDENT_OUTPUT); + // Test token metadata markup + // https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/operators/asyncio/ + DataStream usdcStream = env.fromData(new AddressId(42161, ArbitrumOne.ADDR_USDC)); + SingleOutputStreamOperator elaboratedUsdcStream = + AsyncDataStream.unorderedWait(usdcStream, new TokenElaborator(), 10, TimeUnit.MINUTES); + // debug print + elaboratedUsdcStream.map(token -> {log.info("Token: {}",token); return token.toString();}).print(); + DataStream arbitrumHeads = env .fromSource( NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class), diff --git a/src/main/java/stream/contract/ArbitrumOne.java b/src/main/java/stream/contract/ArbitrumOne.java new file mode 100644 index 0000000..a5541b5 --- /dev/null +++ b/src/main/java/stream/contract/ArbitrumOne.java @@ -0,0 +1,7 @@ +package stream.contract; + +public class ArbitrumOne { + static public final int CHAIN_ID = 42161; + + static public final String ADDR_USDC = "0xaf88d065e77c8cC2239327C5EDb3A432268e5831"; +} diff --git a/src/main/java/stream/contract/ERC20.java b/src/main/java/stream/contract/ERC20.java new file mode 100644 index 0000000..d9014f4 --- /dev/null +++ b/src/main/java/stream/contract/ERC20.java @@ -0,0 +1,263 @@ +package stream.contract; + +import io.reactivex.Flowable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.web3j.abi.EventEncoder; +import org.web3j.abi.TypeReference; +import org.web3j.abi.datatypes.Address; +import org.web3j.abi.datatypes.Event; +import org.web3j.abi.datatypes.Function; +import org.web3j.abi.datatypes.Type; +import org.web3j.abi.datatypes.Utf8String; +import org.web3j.abi.datatypes.generated.Uint256; +import org.web3j.abi.datatypes.generated.Uint8; +import org.web3j.crypto.Credentials; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.core.DefaultBlockParameter; +import org.web3j.protocol.core.RemoteCall; +import org.web3j.protocol.core.methods.request.EthFilter; +import org.web3j.protocol.core.methods.response.Log; +import org.web3j.protocol.core.methods.response.TransactionReceipt; +import org.web3j.tx.Contract; +import org.web3j.tx.TransactionManager; +import org.web3j.tx.gas.ContractGasProvider; + +/** + *

Auto generated code. + *

Do not modify! + *

Please use the web3j command line tools, + * or the org.web3j.codegen.SolidityFunctionWrapperGenerator in the + * codegen module to update. + * + *

Generated with web3j version 4.1.1. + */ +public class ERC20 extends Contract { + private static final String BINARY = "Bin file was not provided"; + + public static final String FUNC_NAME = "name"; + + public static final String FUNC_APPROVE = "approve"; + + public static final String FUNC_TOTALSUPPLY = "totalSupply"; + + public static final String FUNC_TRANSFERFROM = "transferFrom"; + + public static final String FUNC_DECIMALS = "decimals"; + + public static final String FUNC_BALANCEOF = "balanceOf"; + + public static final String FUNC_SYMBOL = "symbol"; + + public static final String FUNC_TRANSFER = "transfer"; + + public static final String FUNC_ALLOWANCE = "allowance"; + + public static final Event TRANSFER_EVENT = new Event("Transfer", + Arrays.>asList(new TypeReference

(true) {}, new TypeReference
(true) {}, new TypeReference() {})); + ; + + public static final Event APPROVAL_EVENT = new Event("Approval", + Arrays.>asList(new TypeReference
(true) {}, new TypeReference
(true) {}, new TypeReference() {})); + ; + + @Deprecated + protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) { + super(BINARY, contractAddress, web3j, credentials, gasPrice, gasLimit); + } + + protected ERC20(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) { + super(BINARY, contractAddress, web3j, credentials, contractGasProvider); + } + + @Deprecated + protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) { + super(BINARY, contractAddress, web3j, transactionManager, gasPrice, gasLimit); + } + + protected ERC20(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) { + super(BINARY, contractAddress, web3j, transactionManager, contractGasProvider); + } + + public RemoteCall name() { + final Function function = new Function(FUNC_NAME, + Arrays.asList(), + Arrays.>asList(new TypeReference() {})); + return executeRemoteCallSingleValueReturn(function, String.class); + } + + public RemoteCall approve(String _spender, BigInteger _value) { + final Function function = new Function( + FUNC_APPROVE, + Arrays.asList(new org.web3j.abi.datatypes.Address(_spender), + new org.web3j.abi.datatypes.generated.Uint256(_value)), + Collections.>emptyList()); + return executeRemoteCallTransaction(function); + } + + public RemoteCall totalSupply() { + final Function function = new Function(FUNC_TOTALSUPPLY, + Arrays.asList(), + Arrays.>asList(new TypeReference() {})); + return executeRemoteCallSingleValueReturn(function, BigInteger.class); + } + + public RemoteCall transferFrom(String _from, String _to, BigInteger _value) { + final Function function = new Function( + FUNC_TRANSFERFROM, + Arrays.asList(new org.web3j.abi.datatypes.Address(_from), + new org.web3j.abi.datatypes.Address(_to), + new org.web3j.abi.datatypes.generated.Uint256(_value)), + Collections.>emptyList()); + return executeRemoteCallTransaction(function); + } + + public RemoteCall decimals() { + final Function function = new Function(FUNC_DECIMALS, + Arrays.asList(), + Arrays.>asList(new TypeReference() {})); + return executeRemoteCallSingleValueReturn(function, BigInteger.class); + } + + public RemoteCall balanceOf(String _owner) { + final Function function = new Function(FUNC_BALANCEOF, + Arrays.asList(new org.web3j.abi.datatypes.Address(_owner)), + Arrays.>asList(new TypeReference() {})); + return executeRemoteCallSingleValueReturn(function, BigInteger.class); + } + + public RemoteCall symbol() { + final Function function = new Function(FUNC_SYMBOL, + Arrays.asList(), + Arrays.>asList(new TypeReference() {})); + return executeRemoteCallSingleValueReturn(function, String.class); + } + + public RemoteCall transfer(String _to, BigInteger _value) { + final Function function = new Function( + FUNC_TRANSFER, + Arrays.asList(new org.web3j.abi.datatypes.Address(_to), + new org.web3j.abi.datatypes.generated.Uint256(_value)), + Collections.>emptyList()); + return executeRemoteCallTransaction(function); + } + + public RemoteCall allowance(String _owner, String _spender) { + final Function function = new Function(FUNC_ALLOWANCE, + Arrays.asList(new org.web3j.abi.datatypes.Address(_owner), + new org.web3j.abi.datatypes.Address(_spender)), + Arrays.>asList(new TypeReference() {})); + return executeRemoteCallSingleValueReturn(function, BigInteger.class); + } + + public List getTransferEvents(TransactionReceipt transactionReceipt) { + List valueList = extractEventParametersWithLog(TRANSFER_EVENT, transactionReceipt); + ArrayList responses = new ArrayList(valueList.size()); + for (Contract.EventValuesWithLog eventValues : valueList) { + TransferEventResponse typedResponse = new TransferEventResponse(); + typedResponse.log = eventValues.getLog(); + typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue(); + typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue(); + typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue(); + responses.add(typedResponse); + } + return responses; + } + + public Flowable transferEventFlowable(EthFilter filter) { + return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function() { + @Override + public TransferEventResponse apply(Log log) { + Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(TRANSFER_EVENT, log); + TransferEventResponse typedResponse = new TransferEventResponse(); + typedResponse.log = log; + typedResponse._from = (String) eventValues.getIndexedValues().get(0).getValue(); + typedResponse._to = (String) eventValues.getIndexedValues().get(1).getValue(); + typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue(); + return typedResponse; + } + }); + } + + public Flowable transferEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) { + EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress()); + filter.addSingleTopic(EventEncoder.encode(TRANSFER_EVENT)); + return transferEventFlowable(filter); + } + + public List getApprovalEvents(TransactionReceipt transactionReceipt) { + List valueList = extractEventParametersWithLog(APPROVAL_EVENT, transactionReceipt); + ArrayList responses = new ArrayList(valueList.size()); + for (Contract.EventValuesWithLog eventValues : valueList) { + ApprovalEventResponse typedResponse = new ApprovalEventResponse(); + typedResponse.log = eventValues.getLog(); + typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue(); + typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue(); + typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue(); + responses.add(typedResponse); + } + return responses; + } + + public Flowable approvalEventFlowable(EthFilter filter) { + return web3j.ethLogFlowable(filter).map(new io.reactivex.functions.Function() { + @Override + public ApprovalEventResponse apply(Log log) { + Contract.EventValuesWithLog eventValues = extractEventParametersWithLog(APPROVAL_EVENT, log); + ApprovalEventResponse typedResponse = new ApprovalEventResponse(); + typedResponse.log = log; + typedResponse._owner = (String) eventValues.getIndexedValues().get(0).getValue(); + typedResponse._spender = (String) eventValues.getIndexedValues().get(1).getValue(); + typedResponse._value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue(); + return typedResponse; + } + }); + } + + public Flowable approvalEventFlowable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) { + EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress()); + filter.addSingleTopic(EventEncoder.encode(APPROVAL_EVENT)); + return approvalEventFlowable(filter); + } + + @Deprecated + public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, BigInteger gasPrice, BigInteger gasLimit) { + return new ERC20(contractAddress, web3j, credentials, gasPrice, gasLimit); + } + + @Deprecated + public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, BigInteger gasPrice, BigInteger gasLimit) { + return new ERC20(contractAddress, web3j, transactionManager, gasPrice, gasLimit); + } + + public static ERC20 load(String contractAddress, Web3j web3j, Credentials credentials, ContractGasProvider contractGasProvider) { + return new ERC20(contractAddress, web3j, credentials, contractGasProvider); + } + + public static ERC20 load(String contractAddress, Web3j web3j, TransactionManager transactionManager, ContractGasProvider contractGasProvider) { + return new ERC20(contractAddress, web3j, transactionManager, contractGasProvider); + } + + public static class TransferEventResponse { + public Log log; + + public String _from; + + public String _to; + + public BigInteger _value; + } + + public static class ApprovalEventResponse { + public Log log; + + public String _owner; + + public String _spender; + + public BigInteger _value; + } +} diff --git a/src/main/java/stream/dto/AddressId.java b/src/main/java/stream/dto/AddressId.java index 8aefca3..2a668da 100644 --- a/src/main/java/stream/dto/AddressId.java +++ b/src/main/java/stream/dto/AddressId.java @@ -2,4 +2,11 @@ package stream.dto; public class AddressId extends ChainId { public String address; + + public AddressId() {} + + public AddressId(int chainId, String address) { + super(chainId); + this.address = address; + } } diff --git a/src/main/java/stream/dto/ChainId.java b/src/main/java/stream/dto/ChainId.java index e0b45b6..16d9166 100644 --- a/src/main/java/stream/dto/ChainId.java +++ b/src/main/java/stream/dto/ChainId.java @@ -4,4 +4,10 @@ import java.io.Serializable; public class ChainId implements Serializable { public int chainId; + + public ChainId() {} + + public ChainId(int chainId) { + this.chainId = chainId; + } } diff --git a/src/main/java/stream/dto/Exchange.java b/src/main/java/stream/dto/Exchange.java index 1ae637e..863f3cb 100644 --- a/src/main/java/stream/dto/Exchange.java +++ b/src/main/java/stream/dto/Exchange.java @@ -1,6 +1,6 @@ package stream.dto; -enum Exchange { +public enum Exchange { UNISWAP_V2, UNISWAP_V3, UNISWAP_V4, diff --git a/src/main/java/stream/dto/Swap.java b/src/main/java/stream/dto/Swap.java index a7b0629..1d88358 100644 --- a/src/main/java/stream/dto/Swap.java +++ b/src/main/java/stream/dto/Swap.java @@ -15,4 +15,16 @@ public class Swap extends ChainId { String makerAsset; // token address BigDecimal amount; // positive means the taker bought; negative means the taker sold. BigDecimal price; + + public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset, + BigDecimal amount, BigDecimal price) { + super(chainId); + this.time = time; + this.exchange = exchange; + this.pool = pool; + this.takerAsset = takerAsset; + this.makerAsset = makerAsset; + this.amount = amount; + this.price = price; + } } diff --git a/src/main/java/stream/dto/Token.java b/src/main/java/stream/dto/Token.java index 6dcde69..792dfd6 100644 --- a/src/main/java/stream/dto/Token.java +++ b/src/main/java/stream/dto/Token.java @@ -6,7 +6,22 @@ public class Token extends AddressId { @Serial private static final long serialVersionUID = 1L; - String name; - String symbol; - int decimals; + public String name; + public String symbol; + public int decimals; + + @SuppressWarnings("unused") + public Token() {} + + public Token(int chainId, String address, String name, String symbol, int decimals) { + super(chainId, address); + this.name = name; + this.symbol = symbol; + this.decimals = decimals; + } + + @Override + public String toString() { + return String.format("Token[%s \"%s\" (%s) .%d]", this.address, this.name, this.symbol, this.decimals); + } } diff --git a/src/main/java/stream/io/EthUtils.java b/src/main/java/stream/io/EthUtils.java index 0ee8d7f..9ff6aea 100644 --- a/src/main/java/stream/io/EthUtils.java +++ b/src/main/java/stream/io/EthUtils.java @@ -1,7 +1,9 @@ package stream.io; +import java.util.HexFormat; + import org.bouncycastle.jcajce.provider.digest.Keccak; -import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; + public class EthUtils { public static boolean isValidAddress(String address) { @@ -35,6 +37,6 @@ public class EthUtils { public static String keccak(String message) { byte[] hash = new Keccak.Digest256().digest(message.getBytes()); - return "0x"+ByteUtils.toHexString(hash); + return "0x" + HexFormat.of().formatHex(hash); } } diff --git a/src/main/java/stream/io/JedisClient.java b/src/main/java/stream/io/JedisClient.java new file mode 100644 index 0000000..b28f6c0 --- /dev/null +++ b/src/main/java/stream/io/JedisClient.java @@ -0,0 +1,34 @@ +package stream.io; + +import redis.clients.jedis.ConnectionPoolConfig; +import redis.clients.jedis.JedisPooled; + +import java.time.Duration; +import java.util.Map; + +public class JedisClient { + protected static volatile JedisPooled jedis = null; + private static final Object jedisLock = new Object(); + + static public JedisPooled get(Map params) { + if (jedis == null) { + synchronized (jedisLock) { + String url = params.getOrDefault("redis_url", "http://localhost:6379"); + int connections = Integer.parseInt(params.getOrDefault("redis_connections", "8")); + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(connections); + poolConfig.setMaxIdle(connections); + poolConfig.setMinIdle(0); + poolConfig.setBlockWhenExhausted(true); + poolConfig.setMaxWait(Duration.ofSeconds(5)); + // Enables sending a PING command periodically while the connection is idle. + poolConfig.setTestWhileIdle(true); + // controls the period between checks for idle connections in the pool + poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(10)); + jedis = new JedisPooled(poolConfig, url); + } + } + return jedis; + } + +} diff --git a/src/main/java/stream/io/TokenElaborator.java b/src/main/java/stream/io/TokenElaborator.java new file mode 100644 index 0000000..1c61e4a --- /dev/null +++ b/src/main/java/stream/io/TokenElaborator.java @@ -0,0 +1,66 @@ +package stream.io; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.web3j.protocol.Web3j; +import org.web3j.tx.ReadonlyTransactionManager; +import org.web3j.tx.gas.DefaultGasProvider; +import stream.contract.ERC20; +import stream.dto.AddressId; +import stream.dto.Token; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + + +public class TokenElaborator extends RichAsyncFunction { + private static final Logger log = LoggerFactory.getLogger(TokenElaborator.class); + private transient Web3j w3; + private DefaultGasProvider gasProvider; + private ReadonlyTransactionManager transactionManager; + + @Override + public void open(OpenContext openContext) throws Exception { + Map params = getRuntimeContext().getGlobalJobParameters(); + w3 = Web3Client.get(params); + gasProvider = new DefaultGasProvider(); + transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234"); + } + + @Override + public void asyncInvoke(AddressId address, final ResultFuture resultFuture) { + log.info("Starting token processing for address: {} on chain: {}", address.address, address.chainId); + CompletableFuture.supplyAsync(() -> { + try { + return processToken(address); + } catch (Exception e) { + log.error("Failed to process token: {} on chain: {}", address.address, address.chainId, e); + throw new RuntimeException("Error processing token: " + address, e); + } + }).thenAccept(result -> resultFuture.complete(Collections.singleton(result))); + } + + private Token processToken(AddressId address) { + String name; + String symbol; + int decimals; + ERC20 contract = ERC20.load(address.address, w3, transactionManager, gasProvider); + try { + CompletableFuture decimalsAsync = contract.decimals().sendAsync(); + CompletableFuture nameAsync = contract.name().sendAsync(); + CompletableFuture symbolAsync = contract.symbol().sendAsync(); + decimals = decimalsAsync.get().intValue(); + name = nameAsync.get(); + symbol = symbolAsync.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return new Token(address.chainId, address.address, name, symbol, decimals); + } +} diff --git a/src/main/java/stream/io/Web3Client.java b/src/main/java/stream/io/Web3Client.java new file mode 100644 index 0000000..5f96b69 --- /dev/null +++ b/src/main/java/stream/io/Web3Client.java @@ -0,0 +1,45 @@ +package stream.io; + +import okhttp3.ConnectionPool; +import okhttp3.OkHttpClient; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.http.HttpService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static okhttp3.ConnectionSpec.CLEARTEXT; +import static okhttp3.ConnectionSpec.MODERN_TLS; + +public class Web3Client { + private static final Logger log = LoggerFactory.getLogger(Web3Client.class); + private static volatile Web3j w3 = null; + private static final Object w3Lock = new Object(); + + static public Web3j get(Map params) { + if (w3==null) { + synchronized (w3Lock) { + log.info("Initializing Web3 client"); + String url = params.getOrDefault("rpc_url", "http://localhost:8545"); + int maxIdleConnections = Integer.parseInt(params.getOrDefault("max_idle_connections", "5")); + int keepAlive = Integer.parseInt(params.getOrDefault("keep_alive", "60")); + Duration timeout = Duration.ofSeconds(30); + log.debug("Web3 connection parameters - URL: {}, Max idle connections: {}, Keep alive: {} minutes, Timeout: {} seconds", + url, maxIdleConnections, keepAlive, timeout.getSeconds()); + var httpClient = new OkHttpClient.Builder() + .connectionSpecs(Arrays.asList(MODERN_TLS, CLEARTEXT)) + .connectTimeout(timeout).callTimeout(timeout).readTimeout(timeout).writeTimeout(timeout) + .connectionPool(new ConnectionPool(maxIdleConnections, keepAlive, TimeUnit.MINUTES)) + .build(); + w3 = Web3j.build(new HttpService(url, httpClient)); + log.info("Web3 client initialized successfully"); + } + } + return w3; + } + +}