diff --git a/pom.xml b/pom.xml index 96937f2..a85f062 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,7 @@ under the License. UTF-8 ${target.java.version} ${target.java.version} - 11 + 24 2.12 2.0.0 2.24.1 @@ -135,8 +135,8 @@ under the License. maven-compiler-plugin 3.1 - ${target.java.version} - ${target.java.version} + 16 + 16 diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index 7b6cc84..7b5767d 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -20,9 +20,11 @@ package stream; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.ParameterTool; -import stream.dto.ArbitrumOneBlock; +import stream.dto.*; +import stream.source.eventlog.EventLogSourceFactory; import stream.source.newheads.NewHeadsSourceFactory; import java.io.IOException; @@ -64,15 +66,19 @@ public class DataStreamJob { ObjectMapper mapper = new ObjectMapper(); mapper.enable(SerializationFeature.INDENT_OUTPUT); - DataStream blockStream = env + DataStream arbitrumHeads = env .fromSource( NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class), org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), - "Ethereum Blocks Source", + "ArbitrumOne Head Blocks", TypeInformation.of(ArbitrumOneBlock.class) ); - + + DataStream mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events"); + DataStream burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events"); + // Map the blocks to pretty-printed JSON strings +/* blockStream .map(block -> { try { @@ -83,6 +89,58 @@ public class DataStreamJob { }) .print("New Ethereum Block: "); + transferStream + .map(event -> { + try { + return mapper.writeValueAsString(event); + } catch (Exception e) { + return "Error converting transfer event to JSON: " + e.getMessage(); + } + }) + .print("Transfer Event: "); + + swapStream + .map(event -> { + try { + return mapper.writeValueAsString(event); + } catch (Exception e) { + return "Error converting swap event to JSON: " + e.getMessage(); + } + }) + .print("Swap Event: "); +*/ + + mintStream + .map(event -> { + try { + return mapper.writeValueAsString(event); + } catch (Exception e) { + return "Error converting mint event to JSON: " + e.getMessage(); + } + }) + .print("Mint Event: "); + + burnStream + .map(event -> { + try { + return mapper.writeValueAsString(event); + } catch (Exception e) { + return "Error converting burn event to JSON: " + e.getMessage(); + } + }) + .print("Burn Event: "); + env.execute("Ethereum Block Stream"); } + + private static DataStreamSource getEventStream(StreamExecutionEnvironment env, URI webSocketUri, + String eventSignature, Class eventLogClass, String streamName) { + var eventType = new EventType<>(eventSignature, eventLogClass); + return env.fromSource( + EventLogSourceFactory.createSource(webSocketUri, eventType), + org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), + streamName, + TypeInformation.of(eventType.eventLogClass) + ); + } } \ No newline at end of file diff --git a/src/main/java/stream/dto/AddressId.java b/src/main/java/stream/dto/AddressId.java new file mode 100644 index 0000000..8aefca3 --- /dev/null +++ b/src/main/java/stream/dto/AddressId.java @@ -0,0 +1,5 @@ +package stream.dto; + +public class AddressId extends ChainId { + public String address; +} diff --git a/src/main/java/stream/dto/ArbitrumOneBlock.java b/src/main/java/stream/dto/ArbitrumOneBlock.java index 48f6c4c..fdd8093 100644 --- a/src/main/java/stream/dto/ArbitrumOneBlock.java +++ b/src/main/java/stream/dto/ArbitrumOneBlock.java @@ -1,6 +1,10 @@ package stream.dto; +import java.io.Serial; + public class ArbitrumOneBlock extends EthereumBlock { + @Serial + private static final long serialVersionUID = 1L; public String l1BlockNumber; public String sendRoot; public String sendCount; @@ -9,5 +13,5 @@ public class ArbitrumOneBlock extends EthereumBlock { public String l1Hash; public Long l1Timestamp; public String sequencerAddress; - + } diff --git a/src/main/java/stream/dto/BigInt.java b/src/main/java/stream/dto/BigInt.java new file mode 100644 index 0000000..1c3f206 --- /dev/null +++ b/src/main/java/stream/dto/BigInt.java @@ -0,0 +1,20 @@ +package stream.dto; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import stream.io.BigIntDeserializer; +import stream.io.BigIntSerializer; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.FIELD, ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonSerialize(using = BigIntSerializer.class) +@JsonDeserialize(using = BigIntDeserializer.class) +public @interface BigInt { +} \ No newline at end of file diff --git a/src/main/java/stream/dto/Block.java b/src/main/java/stream/dto/Block.java index dddff13..ec23d74 100644 --- a/src/main/java/stream/dto/Block.java +++ b/src/main/java/stream/dto/Block.java @@ -1,5 +1,11 @@ package stream.dto; -public class Block { +import java.io.Serial; +import java.io.Serializable; + +public class Block implements Serializable { + @Serial + private static final long serialVersionUID = 1L; + public Long chainId; } diff --git a/src/main/java/stream/dto/BlockHash.java b/src/main/java/stream/dto/BlockHash.java index 04e0414..443ec0b 100644 --- a/src/main/java/stream/dto/BlockHash.java +++ b/src/main/java/stream/dto/BlockHash.java @@ -1,6 +1,11 @@ package stream.dto; +import java.io.Serial; + public class BlockHash extends BlockId { + @Serial + private static final long serialVersionUID = 1L; + public String hash; @Override diff --git a/src/main/java/stream/dto/BlockId.java b/src/main/java/stream/dto/BlockId.java index e3a3506..b427a7e 100644 --- a/src/main/java/stream/dto/BlockId.java +++ b/src/main/java/stream/dto/BlockId.java @@ -1,6 +1,10 @@ package stream.dto; +import java.io.Serial; + public abstract class BlockId extends Block { + @Serial + private static final long serialVersionUID = 1L; + abstract public Object getId(); } - diff --git a/src/main/java/stream/dto/BlockNumber.java b/src/main/java/stream/dto/BlockNumber.java index c154d38..6aa9b19 100644 --- a/src/main/java/stream/dto/BlockNumber.java +++ b/src/main/java/stream/dto/BlockNumber.java @@ -1,6 +1,11 @@ package stream.dto; +import java.io.Serial; + public class BlockNumber extends BlockId { + @Serial + private static final long serialVersionUID = 1L; + public long number; @Override diff --git a/src/main/java/stream/dto/BurnEventLog.java b/src/main/java/stream/dto/BurnEventLog.java new file mode 100644 index 0000000..ffa2505 --- /dev/null +++ b/src/main/java/stream/dto/BurnEventLog.java @@ -0,0 +1,39 @@ +package stream.dto; + +import java.io.Serializable; +import java.math.BigInteger; + + +/* +/// @notice Emitted when a position's liquidity is removed +/// @dev Does not withdraw any fees earned by the liquidity position, which must be withdrawn via #collect +/// @param owner The owner of the position for which liquidity is removed +/// @param tickLower The lower tick of the position +/// @param tickUpper The upper tick of the position +/// @param amount The amount of liquidity to remove +/// @param amount0 The amount of token0 withdrawn +/// @param amount1 The amount of token1 withdrawn +event Burn( + address indexed owner, + int24 indexed tickLower, + int24 indexed tickUpper, + uint128 amount, + uint256 amount0, + uint256 amount1 +); +*/ + +public class BurnEventLog extends EventLog implements Serializable { + public static final String SIGNATURE = "Burn(address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)"; + + public String owner; + public int tickLower; + public int tickUpper; + @BigInt + public BigInteger amount; + @BigInt + public BigInteger amount0; + @BigInt + public BigInteger amount1; + +} diff --git a/src/main/java/stream/dto/ChainId.java b/src/main/java/stream/dto/ChainId.java new file mode 100644 index 0000000..e0b45b6 --- /dev/null +++ b/src/main/java/stream/dto/ChainId.java @@ -0,0 +1,7 @@ +package stream.dto; + +import java.io.Serializable; + +public class ChainId implements Serializable { + public int chainId; +} diff --git a/src/main/java/stream/dto/ElaboratedEventType.java b/src/main/java/stream/dto/ElaboratedEventType.java new file mode 100644 index 0000000..0d20e64 --- /dev/null +++ b/src/main/java/stream/dto/ElaboratedEventType.java @@ -0,0 +1,158 @@ +package stream.dto; + +import org.web3j.abi.FunctionReturnDecoder; +import org.web3j.abi.TypeReference; +import org.web3j.abi.datatypes.Address; +import org.web3j.abi.datatypes.Bool; +import org.web3j.abi.datatypes.Type; +import org.web3j.abi.datatypes.Utf8String; +import org.web3j.abi.datatypes.generated.*; + +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static stream.io.EthUtils.keccak; + +@SuppressWarnings("rawtypes") +public class ElaboratedEventType extends EventType { + + public transient String name; + public transient List paramNames; + public transient List> paramTypes; + public transient String hash; + + private static final String SIGNATURE_REGEX = "^([a-zA-Z_][a-zA-Z0-9_]*)\\((|[^)]*)\\)$"; + private static final String PARAM_REGEX = "([a-zA-Z0-9]+(?:\\[(?:[0-9]*|\\s*)])*)\\s+(?:(indexed)\\s+)?([a-zA-Z_][a-zA-Z0-9_]*)"; + + public ElaboratedEventType(EventType eventType) { + super(eventType.signature, eventType.eventLogClass); + var matcher = Pattern.compile(SIGNATURE_REGEX).matcher(signature); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid event signature format: " + signature); + } + this.name = matcher.group(1); + this.paramNames = new ArrayList<>(); + this.paramTypes = new ArrayList<>(); + var paramTypeNames = new ArrayList(); + + String paramsStr = matcher.group(2).trim(); + if (!paramsStr.isEmpty()) { + var paramMatcher = Pattern.compile(PARAM_REGEX).matcher(paramsStr); + while (paramMatcher.find()) { + String type = paramMatcher.group(1); + boolean indexed = paramMatcher.group(2) != null; + String name = paramMatcher.group(3); + paramTypeNames.add(type); + this.paramNames.add(name); + this.paramTypes.add(TypeReference.create(typeMap.get(type), indexed)); + } + } + String canonical = this.name+"("+String.join(",", paramTypeNames)+")"; + this.hash = keccak(canonical); + } + + public List parseEventData(List topics, String data) { + assert topics.size() == this.paramTypes.stream().filter(TypeReference::isIndexed).count(); + List> nonIndexedParamTypes = this.paramTypes.stream().filter(t -> !t.isIndexed()).toList(); + List dataValues = FunctionReturnDecoder.decode(data, nonIndexedParamTypes).stream().map(Type::getValue).toList(); + int dataValuesIndex = 0; + int topicsIndex = 1; // the first topic is the event signature + List args = new ArrayList<>(); + for (TypeReference paramType : this.paramTypes) { + Object value; + if (paramType.isIndexed()) { + String encoded = topics.get(topicsIndex++); + value = FunctionReturnDecoder.decodeIndexedValue(encoded, paramType).getValue(); + } + else { + value = dataValues.get(dataValuesIndex++); + } + args.add(value); + } + return args; + } + + + private static final Map> typeMap = Map.>ofEntries( + typeEntry("address", Address.class), + typeEntry("bool", Bool.class), + typeEntry("string", Utf8String.class), + typeEntry("uint8", Uint8.class), + typeEntry("uint16", Uint16.class), + typeEntry("uint24", Uint24.class), + typeEntry("uint32", Uint32.class), + typeEntry("uint40", Uint40.class), + typeEntry("uint48", Uint48.class), + typeEntry("uint56", Uint56.class), + typeEntry("uint64", Uint64.class), + typeEntry("uint72", Uint72.class), + typeEntry("uint80", Uint80.class), + typeEntry("uint88", Uint88.class), + typeEntry("uint96", Uint96.class), + typeEntry("uint104", Uint104.class), + typeEntry("uint112", Uint112.class), + typeEntry("uint120", Uint120.class), + typeEntry("uint128", Uint128.class), + typeEntry("uint136", Uint136.class), + typeEntry("uint144", Uint144.class), + typeEntry("uint152", Uint152.class), + typeEntry("uint160", Uint160.class), + typeEntry("uint168", Uint168.class), + typeEntry("uint176", Uint176.class), + typeEntry("uint184", Uint184.class), + typeEntry("uint192", Uint192.class), + typeEntry("uint200", Uint200.class), + typeEntry("uint208", Uint208.class), + typeEntry("uint216", Uint216.class), + typeEntry("uint224", Uint224.class), + typeEntry("uint232", Uint232.class), + typeEntry("uint240", Uint240.class), + typeEntry("uint248", Uint248.class), + typeEntry("uint256", Uint256.class), + typeEntry("int8", Int8.class), + typeEntry("int16", Int16.class), + typeEntry("int24", Int24.class), + typeEntry("int32", Int32.class), + typeEntry("int40", Int40.class), + typeEntry("int48", Int48.class), + typeEntry("int56", Int56.class), + typeEntry("int64", Int64.class), + typeEntry("int72", Int72.class), + typeEntry("int80", Int80.class), + typeEntry("int88", Int88.class), + typeEntry("int96", Int96.class), + typeEntry("int104", Int104.class), + typeEntry("int112", Int112.class), + typeEntry("int120", Int120.class), + typeEntry("int128", Int128.class), + typeEntry("int136", Int136.class), + typeEntry("int144", Int144.class), + typeEntry("int152", Int152.class), + typeEntry("int160", Int160.class), + typeEntry("int168", Int168.class), + typeEntry("int176", Int176.class), + typeEntry("int184", Int184.class), + typeEntry("int192", Int192.class), + typeEntry("int200", Int200.class), + typeEntry("int208", Int208.class), + typeEntry("int216", Int216.class), + typeEntry("int224", Int224.class), + typeEntry("int232", Int232.class), + typeEntry("int240", Int240.class), + typeEntry("int248", Int248.class), + typeEntry("int256", Int256.class) + ); + + private static Map.Entry> typeEntry(String name, Class type) { + //noinspection unchecked + return Map.entry(name, (Class) type); + } + + @Serial + private static final long serialVersionUID = 1L; +} diff --git a/src/main/java/stream/dto/EthereumBlock.java b/src/main/java/stream/dto/EthereumBlock.java index bb8b10a..3ff030e 100644 --- a/src/main/java/stream/dto/EthereumBlock.java +++ b/src/main/java/stream/dto/EthereumBlock.java @@ -5,13 +5,15 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import stream.io.HexLongDeserializer; import stream.io.HexLongSerializer; +import java.io.Serial; import java.util.List; public class EthereumBlock extends Block { + @Serial + private static final long serialVersionUID = 1L; public String hash; - @JsonSerialize(using = HexLongSerializer.class) - @JsonDeserialize(using = HexLongDeserializer.class) + @HexLong public Long number; public String parentHash; public String nonce; @@ -27,9 +29,8 @@ public class EthereumBlock extends Block { public Long size; public String gasLimit; public String gasUsed; - @JsonSerialize(using = HexLongSerializer.class) - @JsonDeserialize(using = HexLongDeserializer.class) + @HexLong public Long timestamp; public List transactions; public List uncles; -} \ No newline at end of file +} diff --git a/src/main/java/stream/dto/EventLog.java b/src/main/java/stream/dto/EventLog.java index 44b67d7..6ddce9a 100644 --- a/src/main/java/stream/dto/EventLog.java +++ b/src/main/java/stream/dto/EventLog.java @@ -1,14 +1,27 @@ package stream.dto; + +import java.io.Serial; +import java.io.Serializable; import java.util.List; -public class EventLog { +public class EventLog implements Serializable { public boolean removed; // true if log was removed due to chain reorg + @HexLong public Long logIndex; + @HexLong public Long transactionIndex; public String transactionHash; public String blockHash; + @HexLong public Long blockNumber; public String address; // contract that emitted the event public String data; // contains the non-indexed parameters public List topics; // contains the event signature and indexed parameters + + // Populate these if args have been parsed out of data and topics + public transient List args = null; + public transient String sender; // from topics[1] + + @Serial + private static final long serialVersionUID = 1L; } diff --git a/src/main/java/stream/dto/EventLogFilter.java b/src/main/java/stream/dto/EventLogFilter.java new file mode 100644 index 0000000..c04dcd1 --- /dev/null +++ b/src/main/java/stream/dto/EventLogFilter.java @@ -0,0 +1,18 @@ + +package stream.dto; + +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class EventLogFilter implements Serializable { + @Serial + private static final long serialVersionUID = 1L; + + public Long fromBlock; // Starting block number (null means latest) + public Long toBlock; // Ending block number (null means latest) + public List addresses = new ArrayList<>(); // Contract addresses to filter + public List topics = new ArrayList<>(); // Event topics to filter (can include the event signature) + public Boolean includeUnfinalized = true; // Whether to include logs from unfinalized blocks +} diff --git a/src/main/java/stream/dto/EventType.java b/src/main/java/stream/dto/EventType.java new file mode 100644 index 0000000..dedec14 --- /dev/null +++ b/src/main/java/stream/dto/EventType.java @@ -0,0 +1,13 @@ +package stream.dto; + +import java.io.Serializable; + +public class EventType implements Serializable { + public String signature; + public Class eventLogClass; + + public EventType(String signature, Class eventLogClass) { + this.signature = signature; + this.eventLogClass = eventLogClass; + } +} diff --git a/src/main/java/stream/dto/HexLong.java b/src/main/java/stream/dto/HexLong.java new file mode 100644 index 0000000..bab2802 --- /dev/null +++ b/src/main/java/stream/dto/HexLong.java @@ -0,0 +1,20 @@ +package stream.dto; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import stream.io.HexLongDeserializer; +import stream.io.HexLongSerializer; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.FIELD, ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonSerialize(using = HexLongSerializer.class) +@JsonDeserialize(using = HexLongDeserializer.class) +public @interface HexLong { +} diff --git a/src/main/java/stream/dto/MintEventLog.java b/src/main/java/stream/dto/MintEventLog.java new file mode 100644 index 0000000..7dee8b4 --- /dev/null +++ b/src/main/java/stream/dto/MintEventLog.java @@ -0,0 +1,41 @@ +package stream.dto; + +import java.io.Serializable; +import java.math.BigInteger; + + +/* +/// @notice Emitted when liquidity is minted for a given position +/// @param sender The address that minted the liquidity +/// @param owner The owner of the position and recipient of any minted liquidity +/// @param tickLower The lower tick of the position +/// @param tickUpper The upper tick of the position +/// @param amount The amount of liquidity minted to the position range +/// @param amount0 How much token0 was required for the minted liquidity +/// @param amount1 How much token1 was required for the minted liquidity +event Mint( + address sender, + address indexed owner, + int24 indexed tickLower, + int24 indexed tickUpper, + uint128 amount, + uint256 amount0, + uint256 amount1 +); +*/ + +public class MintEventLog extends EventLog implements Serializable { + public static final String SIGNATURE = "Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)"; + + public String sender; + public String owner; + public int tickLower; + public int tickUpper; + @BigInt + public BigInteger amount; + @BigInt + public BigInteger amount0; + @BigInt + public BigInteger amount1; + +} diff --git a/src/main/java/stream/dto/Transaction.java b/src/main/java/stream/dto/Transaction.java index 7fe2f3f..424c604 100644 --- a/src/main/java/stream/dto/Transaction.java +++ b/src/main/java/stream/dto/Transaction.java @@ -1,12 +1,18 @@ package stream.dto; +import java.io.Serial; +import java.io.Serializable; import java.util.List; -public class Transaction { +public class Transaction implements Serializable { + @Serial + private static final long serialVersionUID = 1L; public String hash; public String nonce; public String blockHash; + @HexLong public Long blockNumber; + @HexLong public Long transactionIndex; public String from; public String to; @@ -20,11 +26,11 @@ public class Transaction { public String creates; // contract creation address if this tx created a contract public String chainId; public String type; // transaction type (0 = legacy, 1 = EIP-2930, 2 = EIP-1559) - + // EIP-1559 specific fields public String maxFeePerGas; public String maxPriorityFeePerGas; - + // Receipt fields often included with transaction public Long cumulativeGasUsed; public Long gasUsed; diff --git a/src/main/java/stream/io/BigIntDeserializer.java b/src/main/java/stream/io/BigIntDeserializer.java new file mode 100644 index 0000000..cda743e --- /dev/null +++ b/src/main/java/stream/io/BigIntDeserializer.java @@ -0,0 +1,25 @@ +package stream.io; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +import java.io.IOException; +import java.math.BigInteger; + +public class BigIntDeserializer extends JsonDeserializer { + @Override + public BigInteger deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + String value = p.getValueAsString(); + if (value == null || value.isEmpty()) { + return null; + } + + // Remove "0x" prefix if present + if (value.startsWith("0x")) { + value = value.substring(2); + } + + return new BigInteger(value, 16); + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/BigIntSerializer.java b/src/main/java/stream/io/BigIntSerializer.java new file mode 100644 index 0000000..b403534 --- /dev/null +++ b/src/main/java/stream/io/BigIntSerializer.java @@ -0,0 +1,19 @@ +package stream.io; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.math.BigInteger; + +public class BigIntSerializer extends JsonSerializer { + @Override + public void serialize(BigInteger value, JsonGenerator gen, SerializerProvider provider) throws IOException { + if (value == null) { + gen.writeNull(); + } else { + gen.writeString("0x" + value.toString(16)); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/EthUtils.java b/src/main/java/stream/io/EthUtils.java new file mode 100644 index 0000000..0ee8d7f --- /dev/null +++ b/src/main/java/stream/io/EthUtils.java @@ -0,0 +1,40 @@ +package stream.io; + +import org.bouncycastle.jcajce.provider.digest.Keccak; +import org.bouncycastle.pqc.math.linearalgebra.ByteUtils; + +public class EthUtils { + public static boolean isValidAddress(String address) { + if (!address.matches("^0x[0-9a-fA-F]{40}$")) { + return false; + } + + // For non-checksum addresses, just verify hex format + if (address.equals(address.toLowerCase()) || address.equals(address.toUpperCase())) { + return true; + } + + // For checksum addresses, verify the checksum + return address.equals(toChecksumAddress(address.toLowerCase())); + } + + public static String toChecksumAddress(String address) { + address = address.toLowerCase().replace("0x", ""); + String hashHex = keccak(address); + + StringBuilder result = new StringBuilder("0x"); + for (int i = 0; i < address.length(); i++) { + if (Integer.parseInt(String.valueOf(hashHex.charAt(i)), 16) >= 8) { + result.append(String.valueOf(address.charAt(i)).toUpperCase()); + } else { + result.append(address.charAt(i)); + } + } + return result.toString(); + } + + public static String keccak(String message) { + byte[] hash = new Keccak.Digest256().digest(message.getBytes()); + return "0x"+ByteUtils.toHexString(hash); + } +} diff --git a/src/main/java/stream/io/EthereumWebSocketClient.java b/src/main/java/stream/io/EthereumWebSocketClient.java index 0be91ef..5c857f6 100644 --- a/src/main/java/stream/io/EthereumWebSocketClient.java +++ b/src/main/java/stream/io/EthereumWebSocketClient.java @@ -46,6 +46,16 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient { subscribe(new Object[]{"logs", filterParams}, callback); } + /** + * Subscribe to logs with a more detailed filter. + * + * @param filter The filter to apply to the logs + * @param callback The callback to invoke when a log is received + */ + public void subscribeLogFilter(ObjectNode filter, Consumer callback) { + subscribe(new Object[]{"logs", filter}, callback); + } + private void subscribe(Object params, Consumer callback) { Subscription subscription = new Subscription("eth_subscribe", params, callback); String requestId = sendRpc("eth_subscribe", params); @@ -59,7 +69,7 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient { super.onMessage(message); try { JsonNode node = mapper.readTree(message); - + // Handle subscription response if (node.has("id")) { String id = node.get("id").asText(); @@ -70,7 +80,7 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient { activeSubscriptions.put(subscriptionId, subscription); } } - + // Handle subscription notification if (node.has("method") && "eth_subscription".equals(node.get("method").asText())) { JsonNode params = node.get("params"); @@ -83,7 +93,7 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient { } catch (IOException e) { logger.error("Error processing message", e); } - + } @Override @@ -105,4 +115,4 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient { activeSubscriptions.remove(subscriptionId); } } -} \ No newline at end of file +} diff --git a/src/main/java/stream/source/eventlog/EventLogSource.java b/src/main/java/stream/source/eventlog/EventLogSource.java new file mode 100644 index 0000000..f79ce4b --- /dev/null +++ b/src/main/java/stream/source/eventlog/EventLogSource.java @@ -0,0 +1,195 @@ +package stream.source.eventlog; + +import org.apache.flink.api.connector.source.*; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.ElaboratedEventType; +import stream.dto.EventLog; +import stream.dto.EventLogFilter; +import stream.dto.EventType; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +/** + * A Flink Source that emits event logs from the Ethereum blockchain by subscribing to log events. + */ +public class EventLogSource implements Source { + + private static final Logger LOG = LoggerFactory.getLogger(EventLogSource.class); + + private final URI websocketUri; + private final EventLogFilter filter; + private final EventType eventType; + + /** + * Creates a new EventLogSource. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param filter The filter to apply to the event logs + */ + public EventLogSource(URI websocketUri, EventLogFilter filter, EventType eventType) { + this.websocketUri = websocketUri; + this.filter = filter; + this.eventType = eventType; + } + + @Override + public Boundedness getBoundedness() { + // This source is unbounded as it continuously receives new event logs + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + return new EventLogSourceReader(websocketUri, filter, readerContext, eventType); + } + + @Override + public SplitEnumerator createEnumerator(SplitEnumeratorContext enumContext) { + return new EventLogSplitEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, EventLogEnumeratorState state) { + return new EventLogSplitEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new EventLogSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new EventLogEnumeratorStateSerializer(); + } + + /** + * A simple split for the EventLogSource. Since we're just subscribing to a stream of events, + * we don't need to partition the source, so we use a single dummy split. + */ + public static class EventLogSplit implements SourceSplit { + private static final String SPLIT_ID = "eventlog-split"; + + @Override + public String splitId() { + return SPLIT_ID; + } + } + + /** + * A simple state for the EventLogSource enumerator. Since we're just subscribing to a stream of events, + * we don't need to track any state, so this is just a placeholder. + */ + public static class EventLogEnumeratorState { + // No state needed + } + + /** + * A simple serializer for EventLogSplit. + */ + private static class EventLogSplitSerializer implements SimpleVersionedSerializer { + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(EventLogSplit split) { + // Since our split is just a dummy, we can return an empty byte array + return new byte[0]; + } + + @Override + public EventLogSplit deserialize(int version, byte[] serialized) { + // Since our split is just a dummy, we can create a new instance + return new EventLogSplit(); + } + } + + /** + * A simple serializer for EventLogEnumeratorState. + */ + private static class EventLogEnumeratorStateSerializer implements SimpleVersionedSerializer { + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(EventLogEnumeratorState state) { + // Since our state is just a dummy, we can return an empty byte array + return new byte[0]; + } + + @Override + public EventLogEnumeratorState deserialize(int version, byte[] serialized) { + // Since our state is just a dummy, we can create a new instance + return new EventLogEnumeratorState(); + } + } + + /** + * A simple enumerator for the EventLogSource. Since we're just subscribing to a stream of events, + * we don't need to partition the source, so we just assign a single dummy split to the first reader. + */ + private static class EventLogSplitEnumerator implements SplitEnumerator { + private final SplitEnumeratorContext context; + private boolean splitAssigned = false; + + public EventLogSplitEnumerator(SplitEnumeratorContext context) { + this.context = context; + } + + @Override + public void start() { + // If we have any readers, assign the split + if (context.registeredReaders().size() > 0) { + assignSplit(); + } + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + // Ignore split requests, we assign splits proactively + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + // If a reader failed and we get splits back, we'll reassign them when a new reader registers + splitAssigned = false; + } + + @Override + public void addReader(int subtaskId) { + // When a new reader registers, assign the split if it hasn't been assigned yet + if (!splitAssigned) { + assignSplit(); + } + } + + private void assignSplit() { + if (!context.registeredReaders().isEmpty()) { + // Assign the split to the first reader + int firstReader = context.registeredReaders().keySet().iterator().next(); + context.assignSplit(new EventLogSplit(), firstReader); + splitAssigned = true; + LOG.info("Assigned EventLogSplit to reader {}", firstReader); + } + } + + @Override + public EventLogEnumeratorState snapshotState(long checkpointId) throws Exception { + return new EventLogEnumeratorState(); + } + + @Override + public void close() throws IOException { + // No resources to clean up + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/source/eventlog/EventLogSourceFactory.java b/src/main/java/stream/source/eventlog/EventLogSourceFactory.java new file mode 100644 index 0000000..af1a648 --- /dev/null +++ b/src/main/java/stream/source/eventlog/EventLogSourceFactory.java @@ -0,0 +1,33 @@ +package stream.source.eventlog; + +import org.apache.flink.api.connector.source.Source; +import stream.dto.ElaboratedEventType; +import stream.dto.EventLog; +import stream.dto.EventLogFilter; +import stream.dto.EventType; + +import java.net.URI; + +/** + * A factory class for creating EventLogSource and EventLogSubscriber instances. + */ +public class EventLogSourceFactory { + + /** + * Creates a new EventLogSource for the given websocket URI and filter. + * This source will emit event logs from the Ethereum blockchain by subscribing to log events. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param filter The filter to apply to the event logs + * @return A new EventLogSource + */ + public static Source createSource(URI websocketUri, EventType eventType) { + EventLogFilter filter = new EventLogFilter(); + filter.topics.add(new ElaboratedEventType(eventType).hash); + return createSource(websocketUri, eventType, filter); + } + + public static Source createSource(URI websocketUri, EventType eventType, EventLogFilter filter) { + return new EventLogSource<>(websocketUri, filter, eventType); + } +} \ No newline at end of file diff --git a/src/main/java/stream/source/eventlog/EventLogSourceReader.java b/src/main/java/stream/source/eventlog/EventLogSourceReader.java new file mode 100644 index 0000000..1191116 --- /dev/null +++ b/src/main/java/stream/source/eventlog/EventLogSourceReader.java @@ -0,0 +1,127 @@ +package stream.source.eventlog; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.ElaboratedEventType; +import stream.dto.EventLog; +import stream.dto.EventLogFilter; +import stream.dto.EventType; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * A Flink SourceReader that reads event logs from the Ethereum blockchain by subscribing to log events. + */ +public class EventLogSourceReader implements SourceReader { + + private static final Logger LOG = LoggerFactory.getLogger(EventLogSourceReader.class); + + private final URI websocketUri; + private final EventLogFilter filter; + private final List assignedSplits; + private final EventType eventType; + + private EventLogSubscriber subscriber; + + /** + * Creates a new EventLogSourceReader. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param filter The filter to apply to the event logs + * @param context The source reader context + */ + public EventLogSourceReader(URI websocketUri, EventLogFilter filter, @SuppressWarnings("unused") SourceReaderContext context, EventType eventType) { + this.websocketUri = websocketUri; + this.filter = filter; + this.assignedSplits = new ArrayList<>(); + this.eventType = eventType; + } + + @Override + public void start() { + // We don't start the subscriber here, we wait until we get a split + } + + @SuppressWarnings("RedundantThrows") + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + // If we haven't been assigned a split yet, we can't do anything + if (assignedSplits.isEmpty()) { + return InputStatus.NOTHING_AVAILABLE; + } + + // If we haven't created the subscriber yet, create it now + if (subscriber == null) { + try { + subscriber = new EventLogSubscriber(websocketUri, filter, eventType); + subscriber.connect(); + LOG.info("Connected to Ethereum node at {} with filter: {}", websocketUri, filter); + } catch (Exception e) { + LOG.error("Error connecting to Ethereum node", e); + return InputStatus.NOTHING_AVAILABLE; + } + } + + // Try to get the next event log + T eventLog = subscriber.getNextLogNonBlocking(); + if (eventLog != null) { + // We got an event log, emit it + output.collect(eventLog); + LOG.debug("Emitted event log: {}", eventLog); + return InputStatus.MORE_AVAILABLE; + } else { + // No event log available right now + return InputStatus.NOTHING_AVAILABLE; + } + } + + @Override + public CompletableFuture isAvailable() { + // If we have a subscriber and it's running, we might get more data + if (subscriber != null && subscriber.isRunning()) { + return CompletableFuture.completedFuture(null); + } else { + return CompletableFuture.supplyAsync(() -> { + try { + // Wait a bit before checking again + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + } + } + + @Override + public List snapshotState(long checkpointId) { + // We don't need to snapshot any state, just return the assigned splits + return Collections.unmodifiableList(assignedSplits); + } + + @Override + public void addSplits(List splits) { + LOG.info("Adding {} splits", splits.size()); + assignedSplits.addAll(splits); + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("No more splits will be assigned"); + } + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/source/eventlog/EventLogSubscriber.java b/src/main/java/stream/source/eventlog/EventLogSubscriber.java new file mode 100644 index 0000000..566e2d0 --- /dev/null +++ b/src/main/java/stream/source/eventlog/EventLogSubscriber.java @@ -0,0 +1,170 @@ +package stream.source.eventlog; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.*; +import stream.io.EthereumWebSocketClient; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A utility class that subscribes to Ethereum event logs based on a filter and provides methods to get the logs. + * This class is not a Flink source itself, but can be used by a custom Flink source implementation. + */ +public class EventLogSubscriber implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(EventLogSubscriber.class); + + private final URI websocketUri; + private final EventLogFilter filter; + private final BlockingQueue logQueue; + private final AtomicBoolean running; + private final ElaboratedEventType eventType; + + private EthereumWebSocketClient client; + + /** + * Creates a new EventLogSubscriber. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param filter The filter to apply to the event logs + */ + public EventLogSubscriber(URI websocketUri, EventLogFilter filter, EventType eventType) { + this.websocketUri = websocketUri; + this.filter = filter; + this.logQueue = new LinkedBlockingQueue<>(); + this.running = new AtomicBoolean(true); + this.eventType = new ElaboratedEventType<>(eventType); + } + + /** + * Connects to the Ethereum node and subscribes to event logs based on the filter. + * + * @throws Exception If an error occurs while connecting or subscribing + */ + public void connect() throws Exception { + // Create and connect the client + client = new EthereumWebSocketClient(websocketUri); + client.connectBlocking(); + + // Create filter parameters + ObjectNode filterParams = mapper.createObjectNode(); + + // Add fromBlock and toBlock if specified + if (filter.fromBlock != null) { + filterParams.put("fromBlock", "0x" + Long.toHexString(filter.fromBlock)); + } + if (filter.toBlock != null) { + filterParams.put("toBlock", "0x" + Long.toHexString(filter.toBlock)); + } + + // Add addresses if specified + if (filter.addresses != null && !filter.addresses.isEmpty()) { + filterParams.set("address", mapper.valueToTree(filter.addresses)); + } + + // Add topics if specified + if (filter.topics != null && !filter.topics.isEmpty()) { + filterParams.set("topics", mapper.valueToTree(filter.topics)); + } + + // Subscribe to logs with the filter using the new method that supports the full filter + // Add includeUnfinalized if specified + if (filter.includeUnfinalized != null) { + filterParams.put("includeUnfinalized", filter.includeUnfinalized); + } + + client.subscribeLogFilter(filterParams, this::processEventLog); + + LOG.info("Subscribed to event logs from {} with filter: {}", websocketUri, filter); + } + + /** + * Processes an event log received from the Ethereum node. + * + * @param jsonNode The JSON representation of the event log + */ + private void processEventLog(JsonNode jsonNode) { + try { + // Convert the JSON to an EventLog + var data = jsonNode.get("data").asText(); + var topicNodes = jsonNode.get("topics").elements(); + List topics = new ArrayList<>(); + while (topicNodes.hasNext()) + topics.add(topicNodes.next().asText()); + if (!topics.get(0).equals(eventType.hash)) { + LOG.warn("Event signature mismatch. Expected: {}, Got: {}", eventType.hash, topics.get(0)); + return; + } + var argData = mapper.createObjectNode(); + var args = eventType.parseEventData(topics, data); + var entries = new Map.Entry[eventType.paramNames.size()]; + for (int i = 0; i < entries.length; i++) + argData.set(eventType.paramNames.get(i), mapper.valueToTree(args.get(i))); + @SuppressWarnings("unchecked") T eventLog = (T) this.eventType.eventLogClass.getDeclaredConstructor().newInstance(); + ObjectReader updater = mapper.readerForUpdating(eventLog); + updater.readValue(jsonNode); + updater.readValue(argData); + eventLog.args = args; + eventLog.sender = topics.get(0); + + logQueue.add(eventLog); + LOG.debug("Received event log: {}", eventLog); + } catch (Exception e) { + LOG.error("Error processing event log", e); + } + } + + private static final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + /** + * Gets the next event log from the queue, blocking if necessary until a log is available. + * + * @return The next event log + * @throws InterruptedException If the thread is interrupted while waiting + */ + public T getNextLog() throws InterruptedException { + return logQueue.take(); + } + + /** + * Gets the next event log from the queue, returning null if no log is available. + * + * @return The next event log, or null if no log is available + */ + public T getNextLogNonBlocking() { + return logQueue.poll(); + } + + /** + * Checks if the subscriber is running. + * + * @return True if the subscriber is running, false otherwise + */ + public boolean isRunning() { + return running.get(); + } + + @Override + public void close() throws Exception { + // Set running to false to signal that we're done + running.set(false); + + // Close the client if it exists + if (client != null) { + client.close(); + } + } +} diff --git a/src/main/java/stream/uniswap3/OptimizedLiquidityTracker.java b/src/main/java/stream/uniswap3/OptimizedLiquidityTracker.java new file mode 100644 index 0000000..d699a5c --- /dev/null +++ b/src/main/java/stream/uniswap3/OptimizedLiquidityTracker.java @@ -0,0 +1,470 @@ +package stream.uniswap3; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import stream.dto.AddressId; +import stream.dto.BurnEventLog; +import stream.dto.EventLog; +import stream.dto.MintEventLog; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +public class OptimizedLiquidityTracker extends KeyedProcessFunction { + // Uniswap V3 tick bounds + private static final int MIN_TICK = -887272; // at feeTier 0.01% + private static final int MAX_TICK = 887272; + private static final int TICK_RANGE = MAX_TICK - MIN_TICK + 1; + // Using BitSet for compact storage of which ticks are populated + private static final int BITS_PER_WORD = 64; // Since we'll use long[] + private static final int NUM_WORDS = (TICK_RANGE + BITS_PER_WORD - 1) / BITS_PER_WORD; + + private MapState liquidityNetByTick; + private ValueState currentLiquidity; + private ValueState currentTick; + private ValueState tickBitmap; // Using long[] for the bitmap + + + @Override + public void open(OpenContext openContext) throws Exception { + MapStateDescriptor mapDescriptor = + new MapStateDescriptor<>( + "liquidity-net-by-tick", + Types.INT, + TypeInformation.of(new TypeHint() {}) + ); + + ValueStateDescriptor liquidityDescriptor = + new ValueStateDescriptor<>( + "current-liquidity", + TypeInformation.of(new TypeHint() {}) + ); + + ValueStateDescriptor tickDescriptor = + new ValueStateDescriptor<>( + "current-tick", + Types.INT + ); + + ValueStateDescriptor bitmapDescriptor = + new ValueStateDescriptor<>( + "tick-bitmap", + TypeInformation.of(new TypeHint() {}) + ); + + liquidityNetByTick = getRuntimeContext().getMapState(mapDescriptor); + currentLiquidity = getRuntimeContext().getState(liquidityDescriptor); + currentTick = getRuntimeContext().getState(tickDescriptor); + tickBitmap = getRuntimeContext().getState(bitmapDescriptor); + } + + @Override + public void processElement(T event, KeyedProcessFunction.Context context, Collector out) throws Exception { + // Initialize if needed + if (currentLiquidity.value() == null) { + currentLiquidity.update(BigInteger.ZERO); + currentTick.update(0); + tickBitmap.update(new long[NUM_WORDS]); + } + + if (event instanceof MintEventLog m) { + mint(m.tickLower, m.tickUpper, m.amount); + } + else if (event instanceof BurnEventLog b) { + burn(b.tickLower, b.tickUpper, b.amount); + } + out.collect(event); + } + + /** + * Adds liquidity in a tick range [tickLower, tickUpper) + * @param tickLower The lower tick (inclusive) + * @param tickUpper The upper tick (exclusive) + * @param amount The amount of liquidity to add + */ + public void mint(int tickLower, int tickUpper, BigInteger amount) throws Exception { + validateStakeParams(tickLower, tickUpper, amount); + + // Add liquidity at lower tick boundary (entering the position) + updateLiquidityNetAtTick(tickLower, amount); + + // Remove liquidity at upper tick boundary (exiting the position) + updateLiquidityNetAtTick(tickUpper, amount.negate()); + + // If current tick is within range, update current liquidity + int currentTickValue = currentTick.value(); + if (currentTickValue >= tickLower && currentTickValue < tickUpper) { + BigInteger newLiquidity = currentLiquidity.value().add(amount); + currentLiquidity.update(newLiquidity); + } + } + + /** + * Removes liquidity from a tick range [tickLower, tickUpper) + * @param tickLower The lower tick (inclusive) + * @param tickUpper The upper tick (exclusive) + * @param amount The amount of liquidity to remove + */ + public void burn(int tickLower, int tickUpper, BigInteger amount) throws Exception { + validateStakeParams(tickLower, tickUpper, amount); + + // Remove liquidity at lower tick boundary (reverse of mint) + updateLiquidityNetAtTick(tickLower, amount.negate()); + + // Add liquidity at upper tick boundary (reverse of mint) + updateLiquidityNetAtTick(tickUpper, amount); + + // If current tick is within range, update current liquidity + int currentTickValue = currentTick.value(); + if (currentTickValue >= tickLower && currentTickValue < tickUpper) { + BigInteger currentLiquidityValue = currentLiquidity.value(); + if (currentLiquidityValue.compareTo(amount) < 0) { + throw new IllegalStateException("Insufficient liquidity to burn"); + } + BigInteger newLiquidity = currentLiquidityValue.subtract(amount); + currentLiquidity.update(newLiquidity); + } + } + + private void validateStakeParams(int tickLower, int tickUpper, BigInteger amount) { + if (tickLower >= tickUpper) { + throw new IllegalArgumentException("tickLower must be less than tickUpper"); + } + if (tickLower < MIN_TICK || tickUpper > MAX_TICK) { + throw new IllegalArgumentException("Tick range outside bounds"); + } + if (amount.compareTo(BigInteger.ZERO) <= 0) { + throw new IllegalArgumentException("Amount must be positive"); + } + } + + private void updateLiquidityNetAtTick(int tick, BigInteger liquidityDelta) throws Exception { + if (tick < MIN_TICK || tick > MAX_TICK) { + throw new IllegalArgumentException("Tick outside allowed range"); + } + + BigInteger currentNet = liquidityNetByTick.contains(tick) + ? liquidityNetByTick.get(tick) + : BigInteger.ZERO; + + BigInteger newNet = currentNet.add(liquidityDelta); + + // Update bitmap + long[] bitmap = tickBitmap.value(); + int normalizedTick = tick - MIN_TICK; + int wordIndex = normalizedTick / BITS_PER_WORD; + int bitIndex = normalizedTick % BITS_PER_WORD; + + if (newNet.equals(BigInteger.ZERO)) { + liquidityNetByTick.remove(tick); + // Clear bit + bitmap[wordIndex] &= ~(1L << bitIndex); + } else { + liquidityNetByTick.put(tick, newNet); + // Set bit + bitmap[wordIndex] |= (1L << bitIndex); + } + tickBitmap.update(bitmap); + } + + private void crossTick(int newTick) throws Exception { + int oldTick = currentTick.value(); + if (newTick == oldTick) { + return; + } + + BigInteger liquidityValue = currentLiquidity.value(); + long[] bitmap = tickBitmap.value(); + + if (newTick > oldTick) { + // Search forward + for (int tick = oldTick + 1; tick <= newTick; tick++) { + if (tick < MIN_TICK || tick > MAX_TICK) continue; + + int normalizedTick = tick - MIN_TICK; + int wordIndex = normalizedTick / BITS_PER_WORD; + int bitIndex = normalizedTick % BITS_PER_WORD; + + // Check if this tick has liquidity + if ((bitmap[wordIndex] & (1L << bitIndex)) != 0) { + liquidityValue = liquidityValue.add(liquidityNetByTick.get(tick)); + } + } + } else { + // Search backward + for (int tick = oldTick; tick > newTick; tick--) { + if (tick < MIN_TICK || tick > MAX_TICK) continue; + + int normalizedTick = tick - MIN_TICK; + int wordIndex = normalizedTick / BITS_PER_WORD; + int bitIndex = normalizedTick % BITS_PER_WORD; + + // Check if this tick has liquidity + if ((bitmap[wordIndex] & (1L << bitIndex)) != 0) { + liquidityValue = liquidityValue.subtract(liquidityNetByTick.get(tick)); + } + } + } + + currentLiquidity.update(liquidityValue); + currentTick.update(newTick); + } + + // Helper method to find next initialized tick + private int nextInitializedTick(int tick, boolean lte) throws Exception { + if (tick < MIN_TICK || tick > MAX_TICK) { + return lte ? MIN_TICK : MAX_TICK; + } + + long[] bitmap = tickBitmap.value(); + int normalizedTick = tick - MIN_TICK; + int wordIndex = normalizedTick / BITS_PER_WORD; + int bitIndex = normalizedTick % BITS_PER_WORD; + + if (lte) { + // Search backwards + // Check current word + long mask = (1L << bitIndex) - 1; + long bits = bitmap[wordIndex] & mask; + if (bits != 0) { + return MIN_TICK + (wordIndex * BITS_PER_WORD) + (63 - Long.numberOfLeadingZeros(bits)); + } + // Check previous words + for (int i = wordIndex - 1; i >= 0; i--) { + if (bitmap[i] != 0) { + return MIN_TICK + (i * BITS_PER_WORD) + (63 - Long.numberOfLeadingZeros(bitmap[i])); + } + } + return MIN_TICK; + } else { + // Search forwards + // Check current word + long mask = ~((1L << bitIndex) - 1); + long bits = bitmap[wordIndex] & mask; + if (bits != 0) { + return MIN_TICK + (wordIndex * BITS_PER_WORD) + Long.numberOfTrailingZeros(bits); + } + // Check following words + for (int i = wordIndex + 1; i < NUM_WORDS; i++) { + if (bitmap[i] != 0) { + return MIN_TICK + (i * BITS_PER_WORD) + Long.numberOfTrailingZeros(bitmap[i]); + } + } + return MAX_TICK; + } + } + + /** + * Calculate the total liquidity between two ticks without modifying state + * @param fromTick The starting tick (inclusive) + * @param toTick The ending tick (exclusive) + * @return The net liquidity that would be encountered moving from fromTick to toTick + */ + public BigInteger calculateLiquidityInRange(int fromTick, int toTick) throws Exception { + if (fromTick >= toTick) { + throw new IllegalArgumentException("fromTick must be less than toTick"); + } + if (fromTick < MIN_TICK || toTick > MAX_TICK) { + throw new IllegalArgumentException("Tick range outside bounds"); + } + + BigInteger liquidityDelta = BigInteger.ZERO; + long[] bitmap = tickBitmap.value(); + + // Determine direction and adjust loop accordingly + int startTick = fromTick; + int endTick = toTick; + + // Convert to bitmap indices + int startNormalizedTick = startTick - MIN_TICK; + int endNormalizedTick = endTick - MIN_TICK; + + int startWord = startNormalizedTick / BITS_PER_WORD; + int endWord = endNormalizedTick / BITS_PER_WORD; + + // Optimize by checking whole words when possible + for (int wordIndex = startWord; wordIndex <= endWord; wordIndex++) { + long word = bitmap[wordIndex]; + if (word == 0) { + continue; // Skip empty words + } + + // Calculate bit range for this word + int wordStartBit = (wordIndex == startWord) ? + startNormalizedTick % BITS_PER_WORD : 0; + int wordEndBit = (wordIndex == endWord) ? + endNormalizedTick % BITS_PER_WORD : BITS_PER_WORD; + + // Create mask for relevant bits in this word + long mask = -1L >>> (BITS_PER_WORD - (wordEndBit - wordStartBit)); + mask = mask << wordStartBit; + long relevantBits = word & mask; + + // Process each set bit in the word + while (relevantBits != 0) { + int bitIndex = Long.numberOfTrailingZeros(relevantBits); + int tick = MIN_TICK + (wordIndex * BITS_PER_WORD) + bitIndex; + + if (tick >= fromTick && tick < toTick) { + BigInteger liquidityAtTick = liquidityNetByTick.get(tick); + if (liquidityAtTick != null) { + liquidityDelta = liquidityDelta.add(liquidityAtTick); + } + } + + // Clear the processed bit + relevantBits &= ~(1L << bitIndex); + } + } + + return liquidityDelta; + } + + /** + * Overloaded version that returns both the net liquidity change and the absolute liquidity + * at each point in the range + * @param fromTick The starting tick (inclusive) + * @param toTick The ending tick (exclusive) + * @return A list of liquidity points + */ + public Pair> calculateLiquidityInRangeDetailed( + int fromTick, + int toTick + ) throws Exception { + if (fromTick >= toTick) { + throw new IllegalArgumentException("fromTick must be less than toTick"); + } + if (fromTick < MIN_TICK || toTick > MAX_TICK) { + throw new IllegalArgumentException("Tick range outside bounds"); + } + + List points = new ArrayList<>(); + BigInteger runningLiquidity = BigInteger.ZERO; + BigInteger netChange = BigInteger.ZERO; + long[] bitmap = tickBitmap.value(); + + int startNormalizedTick = fromTick - MIN_TICK; + int endNormalizedTick = toTick - MIN_TICK; + + int startWord = startNormalizedTick / BITS_PER_WORD; + int endWord = endNormalizedTick / BITS_PER_WORD; + + for (int wordIndex = startWord; wordIndex <= endWord; wordIndex++) { + long word = bitmap[wordIndex]; + if (word == 0) { + continue; + } + + int wordStartBit = (wordIndex == startWord) ? + startNormalizedTick % BITS_PER_WORD : 0; + int wordEndBit = (wordIndex == endWord) ? + endNormalizedTick % BITS_PER_WORD : BITS_PER_WORD; + + long mask = -1L >>> (BITS_PER_WORD - (wordEndBit - wordStartBit)); + mask = mask << wordStartBit; + long relevantBits = word & mask; + + while (relevantBits != 0) { + int bitIndex = Long.numberOfTrailingZeros(relevantBits); + int tick = MIN_TICK + (wordIndex * BITS_PER_WORD) + bitIndex; + + if (tick >= fromTick && tick < toTick) { + BigInteger liquidityAtTick = liquidityNetByTick.get(tick); + if (liquidityAtTick != null) { + netChange = netChange.add(liquidityAtTick); + runningLiquidity = runningLiquidity.add(liquidityAtTick); + points.add(new LiquidityPoint(tick, liquidityAtTick, runningLiquidity)); + } + } + + relevantBits &= ~(1L << bitIndex); + } + } + + return Pair.of(netChange, points); + } + + /** + * Returns a list of liquidities for each tick in the given range. + * @param fromTick The starting tick (inclusive) + * @param toTick The ending tick (exclusive) + * @return List of liquidity amounts, where index 0 corresponds to fromTick + */ + public List getLiquidityRange(int fromTick, int toTick) throws Exception { + if (fromTick >= toTick) { + throw new IllegalArgumentException("fromTick must be less than toTick"); + } + if (fromTick < MIN_TICK || toTick > MAX_TICK) { + throw new IllegalArgumentException("Tick range outside bounds"); + } + + int size = toTick - fromTick; + List liquidities = new ArrayList<>(size); + BigInteger runningLiquidity = BigInteger.ZERO; + + // Initialize all positions with zero + for (int i = 0; i < size; i++) { + liquidities.add(BigInteger.ZERO); + } + + // Use bitmap to efficiently find populated ticks + long[] bitmap = tickBitmap.value(); + int startNormalizedTick = fromTick - MIN_TICK; + int endNormalizedTick = toTick - MIN_TICK; + + int startWord = startNormalizedTick / BITS_PER_WORD; + int endWord = endNormalizedTick / BITS_PER_WORD; + + // Process each word in the bitmap + for (int wordIndex = startWord; wordIndex <= endWord; wordIndex++) { + long word = bitmap[wordIndex]; + if (word == 0) { + continue; + } + + int wordStartBit = (wordIndex == startWord) ? + startNormalizedTick % BITS_PER_WORD : 0; + int wordEndBit = (wordIndex == endWord) ? + endNormalizedTick % BITS_PER_WORD : BITS_PER_WORD; + + long mask = -1L >>> (BITS_PER_WORD - (wordEndBit - wordStartBit)); + mask = mask << wordStartBit; + long relevantBits = word & mask; + + while (relevantBits != 0) { + int bitIndex = Long.numberOfTrailingZeros(relevantBits); + int tick = MIN_TICK + (wordIndex * BITS_PER_WORD) + bitIndex; + + if (tick >= fromTick && tick < toTick) { + BigInteger liquidityAtTick = liquidityNetByTick.get(tick); + if (liquidityAtTick != null) { + runningLiquidity = runningLiquidity.add(liquidityAtTick); + // Update all subsequent positions with the new running liquidity + for (int i = tick - fromTick; i < size; i++) { + liquidities.set(i, runningLiquidity); + } + } + } + + relevantBits &= ~(1L << bitIndex); + } + } + + return liquidities; + } + + /** + * Helper class to represent liquidity at a specific tick + */ + public record LiquidityPoint(int tick, BigInteger liquidityDelta, BigInteger liquidityAfter) {} +} \ No newline at end of file diff --git a/src/main/java/stream/uniswap3/README-TickLiquidityTracker.md b/src/main/java/stream/uniswap3/README-TickLiquidityTracker.md new file mode 100644 index 0000000..cd7cc5e --- /dev/null +++ b/src/main/java/stream/uniswap3/README-TickLiquidityTracker.md @@ -0,0 +1,69 @@ +# TickLiquidityTracker + +## Overview + +The `TickLiquidityTracker` class replicates Uniswap v3's tick-based liquidity tracking system. In Uniswap v3, liquidity is concentrated within specific price ranges defined by ticks, rather than being distributed across the entire price curve as in previous versions. + +## How Uniswap v3 Tracks Liquidity + +Uniswap v3 uses a clever data structure to track liquidity: + +1. **Tick-Based Liquidity**: Each tick represents a specific price point. Liquidity providers can add liquidity within a range defined by a lower and upper tick. + +2. **Net Liquidity Changes**: At each tick boundary, Uniswap records the net change in liquidity that occurs when crossing that tick. This is stored as `liquidityNet` in the Uniswap v3 contracts. + +3. **Global Liquidity Tracking**: The protocol maintains a global `liquidity` variable that represents the current active liquidity at the current tick. + +4. **Efficient Range Queries**: To determine the liquidity at any given tick, Uniswap doesn't need to iterate through all positions. Instead, it can calculate it by starting with the global liquidity and adding/subtracting the net liquidity changes when crossing ticks. + +## Implementation Details + +The `TickLiquidityTracker` class implements this system with the following components: + +1. **Data Structures**: + - `TreeMap liquidityNetByTick`: Stores the net liquidity change at each tick + - `BigInteger liquidity`: Tracks the current global liquidity + - `int currentTick`: Tracks the current tick + +2. **Key Methods**: + - `updateLiquidityOnMint(int tickLower, int tickUpper, BigInteger amount)`: Updates liquidity when a new position is created + - `updateLiquidityOnBurn(int tickLower, int tickUpper, BigInteger amount)`: Updates liquidity when a position is removed + - `crossTick(int newTick)`: Updates the current tick and global liquidity when crossing ticks + - `getLiquidityAtTick(int tick)`: Calculates the liquidity at a specific tick without changing the current state + +## Usage Example + +```java +// Create a new tracker +TickLiquidityTracker tracker = new TickLiquidityTracker(); + +// Add a position from tick 10 to 20 with 500 liquidity +tracker.updateLiquidityOnMint(10, 20, BigInteger.valueOf(500)); + +// Move to tick 15 (inside the position range) +tracker.crossTick(15); + +// Get current liquidity +BigInteger currentLiquidity = tracker.getCurrentLiquidity(); + +// Calculate liquidity at a different tick without moving +BigInteger liquidityAtTick12 = tracker.getLiquidityAtTick(12); +``` + +For a more comprehensive demonstration, see the `TickLiquidityTrackerDemo` class. + +## Performance Considerations + +- The implementation uses a `TreeMap` to efficiently store and query tick data +- Crossing ticks has O(k) complexity where k is the number of ticks crossed +- Calculating liquidity at a specific tick has O(k) complexity where k is the number of ticks between the current tick and the target tick + +## Comparison with Uniswap v3 Implementation + +This implementation closely follows the approach used in Uniswap v3's core contracts, particularly: + +1. The use of net liquidity changes at tick boundaries +2. The method of calculating liquidity by traversing ticks +3. The efficient handling of tick crossings + +The main difference is that this implementation is in Java rather than Solidity, and uses `BigInteger` for precise arithmetic instead of Solidity's fixed-point arithmetic. \ No newline at end of file diff --git a/src/main/java/stream/uniswap3/TickLiquidityTracker.java b/src/main/java/stream/uniswap3/TickLiquidityTracker.java new file mode 100644 index 0000000..bd8218e --- /dev/null +++ b/src/main/java/stream/uniswap3/TickLiquidityTracker.java @@ -0,0 +1,218 @@ +package stream.uniswap3; + +import java.io.Serial; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Map; +import java.util.TreeMap; + +/** + * TickLiquidityTracker replicates Uniswap v3's tick-based liquidity tracking system. + * + * In Uniswap v3, liquidity is concentrated within specific price ranges defined by ticks. + * The protocol tracks: + * 1. The net liquidity change at each tick (liquidityNet) + * 2. The current global liquidity (liquidity) + * + * When the price crosses a tick, the global liquidity is updated by adding the net liquidity + * at that tick. This allows efficient calculation of liquidity at any price range. + */ +public class TickLiquidityTracker implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + // TreeMap to store the net liquidity change at each tick + // Key: tick index, Value: net liquidity change when crossing this tick + private final TreeMap liquidityNetByTick; + + // Current active liquidity + private BigInteger liquidity; + + // Current tick + private int currentTick; + + /** + * Creates a new TickLiquidityTracker with zero initial liquidity. + */ + public TickLiquidityTracker() { + this.liquidityNetByTick = new TreeMap<>(); + this.liquidity = BigInteger.ZERO; + this.currentTick = 0; + } + + /** + * Creates a new TickLiquidityTracker with specified initial liquidity and tick. + * + * @param initialLiquidity The initial global liquidity + * @param initialTick The initial current tick + */ + public TickLiquidityTracker(BigInteger initialLiquidity, int initialTick) { + this.liquidityNetByTick = new TreeMap<>(); + this.liquidity = initialLiquidity; + this.currentTick = initialTick; + } + + /** + * Updates the liquidity when a position is minted. + * + * @param tickLower The lower tick of the position + * @param tickUpper The upper tick of the position + * @param amount The amount of liquidity added + */ + public void updateLiquidityOnMint(int tickLower, int tickUpper, BigInteger amount) { + // Add liquidity at lower tick (when price crosses up) + updateLiquidityNetAtTick(tickLower, amount); + + // Remove liquidity at upper tick (when price crosses up) + updateLiquidityNetAtTick(tickUpper, amount.negate()); + + // If current tick is within the range, update current liquidity + if (currentTick >= tickLower && currentTick < tickUpper) { + liquidity = liquidity.add(amount); + } + } + + /** + * Updates the liquidity when a position is burned. + * + * @param tickLower The lower tick of the position + * @param tickUpper The upper tick of the position + * @param amount The amount of liquidity removed + */ + public void updateLiquidityOnBurn(int tickLower, int tickUpper, BigInteger amount) { + // Remove liquidity at lower tick (when price crosses up) + updateLiquidityNetAtTick(tickLower, amount.negate()); + + // Add liquidity at upper tick (when price crosses up) + updateLiquidityNetAtTick(tickUpper, amount); + + // If current tick is within the range, update current liquidity + if (currentTick >= tickLower && currentTick < tickUpper) { + liquidity = liquidity.subtract(amount); + } + } + + /** + * Updates the net liquidity at a specific tick. + * + * @param tick The tick index + * @param liquidityDelta The change in liquidity + */ + private void updateLiquidityNetAtTick(int tick, BigInteger liquidityDelta) { + BigInteger currentNet = liquidityNetByTick.getOrDefault(tick, BigInteger.ZERO); + liquidityNetByTick.put(tick, currentNet.add(liquidityDelta)); + } + + /** + * Moves the current tick and updates the global liquidity accordingly. + * + * @param newTick The new current tick + */ + public void crossTick(int newTick) { + // If moving to the same tick, do nothing + if (newTick == currentTick) { + return; + } + + // Determine direction of tick movement + boolean isMovingUp = newTick > currentTick; + + // Get all ticks that will be crossed + Map crossedTicks; + if (isMovingUp) { + // When moving up, we include the current tick and exclude the new tick + crossedTicks = liquidityNetByTick.subMap(currentTick, false, newTick, false); + } else { + // When moving down, we exclude the current tick and include the new tick + crossedTicks = liquidityNetByTick.subMap(newTick, false, currentTick, false); + } + + // Update liquidity by applying all crossed ticks + for (Map.Entry entry : crossedTicks.entrySet()) { + if (isMovingUp) { + // When moving up, add the net liquidity + liquidity = liquidity.add(entry.getValue()); + } else { + // When moving down, subtract the net liquidity + liquidity = liquidity.subtract(entry.getValue()); + } + } + + // Update current tick + currentTick = newTick; + } + + /** + * Calculates the liquidity at a specific tick. + * + * @param tick The tick to calculate liquidity for + * @return The liquidity at the specified tick + */ + public BigInteger getLiquidityAtTick(int tick) { + // If the requested tick is the current tick, return current liquidity + if (tick == currentTick) { + return liquidity; + } + + BigInteger result = liquidity; + + // If the requested tick is above current tick + if (tick > currentTick) { + // Add all liquidityNet values between current tick (exclusive) and target tick (exclusive) + Map ticksToAdd = liquidityNetByTick.subMap(currentTick, false, tick, false); + for (BigInteger delta : ticksToAdd.values()) { + result = result.add(delta); + } + } + // If the requested tick is below current tick + else { + // Subtract all liquidityNet values between target tick (exclusive) and current tick (exclusive) + Map ticksToSubtract = liquidityNetByTick.subMap(tick, false, currentTick, false); + for (BigInteger delta : ticksToSubtract.values()) { + result = result.subtract(delta); + } + } + + return result; + } + + /** + * Gets the current global liquidity. + * + * @return The current global liquidity + */ + public BigInteger getCurrentLiquidity() { + return liquidity; + } + + /** + * Gets the current tick. + * + * @return The current tick + */ + public int getCurrentTick() { + return currentTick; + } + + /** + * Gets the net liquidity change at a specific tick. + * + * @param tick The tick index + * @return The net liquidity change at the specified tick + */ + public BigInteger getLiquidityNetAtTick(int tick) { + return liquidityNetByTick.getOrDefault(tick, BigInteger.ZERO); + } + + /** + * Gets all ticks with non-zero liquidity net values. + * + * @return A map of tick indices to their net liquidity changes + */ + public Map getAllLiquidityNetTicks() { + return new TreeMap<>(liquidityNetByTick); + } + + +} \ No newline at end of file