DTO's and other infra

This commit is contained in:
tim
2025-09-22 17:34:05 -04:00
parent 0cd8e1e28f
commit a80593b9c2
30 changed files with 1822 additions and 23 deletions

View File

@@ -31,7 +31,7 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target> <maven.compiler.target>${target.java.version}</maven.compiler.target>
<target.java.version>11</target.java.version> <target.java.version>24</target.java.version>
<scala.binary.version>2.12</scala.binary.version> <scala.binary.version>2.12</scala.binary.version>
<flink.version>2.0.0</flink.version> <flink.version>2.0.0</flink.version>
<log4j.version>2.24.1</log4j.version> <log4j.version>2.24.1</log4j.version>
@@ -135,8 +135,8 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version> <version>3.1</version>
<configuration> <configuration>
<source>${target.java.version}</source> <source>16</source>
<target>${target.java.version}</target> <target>16</target>
</configuration> </configuration>
</plugin> </plugin>

View File

@@ -20,9 +20,11 @@ package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.ParameterTool; import org.apache.flink.util.ParameterTool;
import stream.dto.ArbitrumOneBlock; import stream.dto.*;
import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory; import stream.source.newheads.NewHeadsSourceFactory;
import java.io.IOException; import java.io.IOException;
@@ -64,15 +66,19 @@ public class DataStreamJob {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> blockStream = env DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource( .fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class), NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"Ethereum Blocks Source", "ArbitrumOne Head Blocks",
TypeInformation.of(ArbitrumOneBlock.class) TypeInformation.of(ArbitrumOneBlock.class)
); );
DataStream<MintEventLog> mintStream = getEventStream(env, webSocketUri, MintEventLog.SIGNATURE, MintEventLog.class, "Uniswap v3 Mint Events");
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
// Map the blocks to pretty-printed JSON strings // Map the blocks to pretty-printed JSON strings
/*
blockStream blockStream
.map(block -> { .map(block -> {
try { try {
@@ -83,6 +89,58 @@ public class DataStreamJob {
}) })
.print("New Ethereum Block: "); .print("New Ethereum Block: ");
transferStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting transfer event to JSON: " + e.getMessage();
}
})
.print("Transfer Event: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: ");
*/
mintStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting mint event to JSON: " + e.getMessage();
}
})
.print("Mint Event: ");
burnStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting burn event to JSON: " + e.getMessage();
}
})
.print("Burn Event: ");
env.execute("Ethereum Block Stream"); env.execute("Ethereum Block Stream");
} }
private static<T extends EventLog> DataStreamSource<T> getEventStream(StreamExecutionEnvironment env, URI webSocketUri,
String eventSignature, Class<T> eventLogClass, String streamName) {
var eventType = new EventType<>(eventSignature, eventLogClass);
return env.fromSource(
EventLogSourceFactory.<T>createSource(webSocketUri, eventType),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
streamName,
TypeInformation.of(eventType.eventLogClass)
);
}
} }

View File

@@ -0,0 +1,5 @@
package stream.dto;
public class AddressId extends ChainId {
public String address;
}

View File

@@ -1,6 +1,10 @@
package stream.dto; package stream.dto;
import java.io.Serial;
public class ArbitrumOneBlock extends EthereumBlock { public class ArbitrumOneBlock extends EthereumBlock {
@Serial
private static final long serialVersionUID = 1L;
public String l1BlockNumber; public String l1BlockNumber;
public String sendRoot; public String sendRoot;
public String sendCount; public String sendCount;
@@ -9,5 +13,5 @@ public class ArbitrumOneBlock extends EthereumBlock {
public String l1Hash; public String l1Hash;
public Long l1Timestamp; public Long l1Timestamp;
public String sequencerAddress; public String sequencerAddress;
} }

View File

@@ -0,0 +1,20 @@
package stream.dto;
import com.fasterxml.jackson.annotation.JacksonAnnotationsInside;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import stream.io.BigIntDeserializer;
import stream.io.BigIntSerializer;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.FIELD, ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
@JacksonAnnotationsInside
@JsonSerialize(using = BigIntSerializer.class)
@JsonDeserialize(using = BigIntDeserializer.class)
public @interface BigInt {
}

View File

@@ -1,5 +1,11 @@
package stream.dto; package stream.dto;
public class Block { import java.io.Serial;
import java.io.Serializable;
public class Block implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
public Long chainId; public Long chainId;
} }

View File

@@ -1,6 +1,11 @@
package stream.dto; package stream.dto;
import java.io.Serial;
public class BlockHash extends BlockId { public class BlockHash extends BlockId {
@Serial
private static final long serialVersionUID = 1L;
public String hash; public String hash;
@Override @Override

View File

@@ -1,6 +1,10 @@
package stream.dto; package stream.dto;
import java.io.Serial;
public abstract class BlockId extends Block { public abstract class BlockId extends Block {
@Serial
private static final long serialVersionUID = 1L;
abstract public Object getId(); abstract public Object getId();
} }

View File

@@ -1,6 +1,11 @@
package stream.dto; package stream.dto;
import java.io.Serial;
public class BlockNumber extends BlockId { public class BlockNumber extends BlockId {
@Serial
private static final long serialVersionUID = 1L;
public long number; public long number;
@Override @Override

View File

@@ -0,0 +1,39 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
/*
/// @notice Emitted when a position's liquidity is removed
/// @dev Does not withdraw any fees earned by the liquidity position, which must be withdrawn via #collect
/// @param owner The owner of the position for which liquidity is removed
/// @param tickLower The lower tick of the position
/// @param tickUpper The upper tick of the position
/// @param amount The amount of liquidity to remove
/// @param amount0 The amount of token0 withdrawn
/// @param amount1 The amount of token1 withdrawn
event Burn(
address indexed owner,
int24 indexed tickLower,
int24 indexed tickUpper,
uint128 amount,
uint256 amount0,
uint256 amount1
);
*/
public class BurnEventLog extends EventLog implements Serializable {
public static final String SIGNATURE = "Burn(address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)";
public String owner;
public int tickLower;
public int tickUpper;
@BigInt
public BigInteger amount;
@BigInt
public BigInteger amount0;
@BigInt
public BigInteger amount1;
}

View File

@@ -0,0 +1,7 @@
package stream.dto;
import java.io.Serializable;
public class ChainId implements Serializable {
public int chainId;
}

View File

@@ -0,0 +1,158 @@
package stream.dto;
import org.web3j.abi.FunctionReturnDecoder;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Bool;
import org.web3j.abi.datatypes.Type;
import org.web3j.abi.datatypes.Utf8String;
import org.web3j.abi.datatypes.generated.*;
import java.io.Serial;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import static stream.io.EthUtils.keccak;
@SuppressWarnings("rawtypes")
public class ElaboratedEventType<T extends EventLog> extends EventType {
public transient String name;
public transient List<String> paramNames;
public transient List<TypeReference<Type>> paramTypes;
public transient String hash;
private static final String SIGNATURE_REGEX = "^([a-zA-Z_][a-zA-Z0-9_]*)\\((|[^)]*)\\)$";
private static final String PARAM_REGEX = "([a-zA-Z0-9]+(?:\\[(?:[0-9]*|\\s*)])*)\\s+(?:(indexed)\\s+)?([a-zA-Z_][a-zA-Z0-9_]*)";
public ElaboratedEventType(EventType<T> eventType) {
super(eventType.signature, eventType.eventLogClass);
var matcher = Pattern.compile(SIGNATURE_REGEX).matcher(signature);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid event signature format: " + signature);
}
this.name = matcher.group(1);
this.paramNames = new ArrayList<>();
this.paramTypes = new ArrayList<>();
var paramTypeNames = new ArrayList<String>();
String paramsStr = matcher.group(2).trim();
if (!paramsStr.isEmpty()) {
var paramMatcher = Pattern.compile(PARAM_REGEX).matcher(paramsStr);
while (paramMatcher.find()) {
String type = paramMatcher.group(1);
boolean indexed = paramMatcher.group(2) != null;
String name = paramMatcher.group(3);
paramTypeNames.add(type);
this.paramNames.add(name);
this.paramTypes.add(TypeReference.create(typeMap.get(type), indexed));
}
}
String canonical = this.name+"("+String.join(",", paramTypeNames)+")";
this.hash = keccak(canonical);
}
public List<Object> parseEventData(List<String> topics, String data) {
assert topics.size() == this.paramTypes.stream().filter(TypeReference::isIndexed).count();
List<TypeReference<Type>> nonIndexedParamTypes = this.paramTypes.stream().filter(t -> !t.isIndexed()).toList();
List dataValues = FunctionReturnDecoder.decode(data, nonIndexedParamTypes).stream().map(Type::getValue).toList();
int dataValuesIndex = 0;
int topicsIndex = 1; // the first topic is the event signature
List<Object> args = new ArrayList<>();
for (TypeReference<Type> paramType : this.paramTypes) {
Object value;
if (paramType.isIndexed()) {
String encoded = topics.get(topicsIndex++);
value = FunctionReturnDecoder.decodeIndexedValue(encoded, paramType).getValue();
}
else {
value = dataValues.get(dataValuesIndex++);
}
args.add(value);
}
return args;
}
private static final Map<String, Class<Type>> typeMap = Map.<String,Class<Type>>ofEntries(
typeEntry("address", Address.class),
typeEntry("bool", Bool.class),
typeEntry("string", Utf8String.class),
typeEntry("uint8", Uint8.class),
typeEntry("uint16", Uint16.class),
typeEntry("uint24", Uint24.class),
typeEntry("uint32", Uint32.class),
typeEntry("uint40", Uint40.class),
typeEntry("uint48", Uint48.class),
typeEntry("uint56", Uint56.class),
typeEntry("uint64", Uint64.class),
typeEntry("uint72", Uint72.class),
typeEntry("uint80", Uint80.class),
typeEntry("uint88", Uint88.class),
typeEntry("uint96", Uint96.class),
typeEntry("uint104", Uint104.class),
typeEntry("uint112", Uint112.class),
typeEntry("uint120", Uint120.class),
typeEntry("uint128", Uint128.class),
typeEntry("uint136", Uint136.class),
typeEntry("uint144", Uint144.class),
typeEntry("uint152", Uint152.class),
typeEntry("uint160", Uint160.class),
typeEntry("uint168", Uint168.class),
typeEntry("uint176", Uint176.class),
typeEntry("uint184", Uint184.class),
typeEntry("uint192", Uint192.class),
typeEntry("uint200", Uint200.class),
typeEntry("uint208", Uint208.class),
typeEntry("uint216", Uint216.class),
typeEntry("uint224", Uint224.class),
typeEntry("uint232", Uint232.class),
typeEntry("uint240", Uint240.class),
typeEntry("uint248", Uint248.class),
typeEntry("uint256", Uint256.class),
typeEntry("int8", Int8.class),
typeEntry("int16", Int16.class),
typeEntry("int24", Int24.class),
typeEntry("int32", Int32.class),
typeEntry("int40", Int40.class),
typeEntry("int48", Int48.class),
typeEntry("int56", Int56.class),
typeEntry("int64", Int64.class),
typeEntry("int72", Int72.class),
typeEntry("int80", Int80.class),
typeEntry("int88", Int88.class),
typeEntry("int96", Int96.class),
typeEntry("int104", Int104.class),
typeEntry("int112", Int112.class),
typeEntry("int120", Int120.class),
typeEntry("int128", Int128.class),
typeEntry("int136", Int136.class),
typeEntry("int144", Int144.class),
typeEntry("int152", Int152.class),
typeEntry("int160", Int160.class),
typeEntry("int168", Int168.class),
typeEntry("int176", Int176.class),
typeEntry("int184", Int184.class),
typeEntry("int192", Int192.class),
typeEntry("int200", Int200.class),
typeEntry("int208", Int208.class),
typeEntry("int216", Int216.class),
typeEntry("int224", Int224.class),
typeEntry("int232", Int232.class),
typeEntry("int240", Int240.class),
typeEntry("int248", Int248.class),
typeEntry("int256", Int256.class)
);
private static<T extends Type> Map.Entry<String, Class<Type>> typeEntry(String name, Class<T> type) {
//noinspection unchecked
return Map.entry(name, (Class<Type>) type);
}
@Serial
private static final long serialVersionUID = 1L;
}

View File

@@ -5,13 +5,15 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import stream.io.HexLongDeserializer; import stream.io.HexLongDeserializer;
import stream.io.HexLongSerializer; import stream.io.HexLongSerializer;
import java.io.Serial;
import java.util.List; import java.util.List;
public class EthereumBlock extends Block { public class EthereumBlock extends Block {
@Serial
private static final long serialVersionUID = 1L;
public String hash; public String hash;
@JsonSerialize(using = HexLongSerializer.class) @HexLong
@JsonDeserialize(using = HexLongDeserializer.class)
public Long number; public Long number;
public String parentHash; public String parentHash;
public String nonce; public String nonce;
@@ -27,9 +29,8 @@ public class EthereumBlock extends Block {
public Long size; public Long size;
public String gasLimit; public String gasLimit;
public String gasUsed; public String gasUsed;
@JsonSerialize(using = HexLongSerializer.class) @HexLong
@JsonDeserialize(using = HexLongDeserializer.class)
public Long timestamp; public Long timestamp;
public List<String> transactions; public List<String> transactions;
public List<String> uncles; public List<String> uncles;
} }

View File

@@ -1,14 +1,27 @@
package stream.dto; package stream.dto;
import java.io.Serial;
import java.io.Serializable;
import java.util.List; import java.util.List;
public class EventLog { public class EventLog implements Serializable {
public boolean removed; // true if log was removed due to chain reorg public boolean removed; // true if log was removed due to chain reorg
@HexLong
public Long logIndex; public Long logIndex;
@HexLong
public Long transactionIndex; public Long transactionIndex;
public String transactionHash; public String transactionHash;
public String blockHash; public String blockHash;
@HexLong
public Long blockNumber; public Long blockNumber;
public String address; // contract that emitted the event public String address; // contract that emitted the event
public String data; // contains the non-indexed parameters public String data; // contains the non-indexed parameters
public List<String> topics; // contains the event signature and indexed parameters public List<String> topics; // contains the event signature and indexed parameters
// Populate these if args have been parsed out of data and topics
public transient List<Object> args = null;
public transient String sender; // from topics[1]
@Serial
private static final long serialVersionUID = 1L;
} }

View File

@@ -0,0 +1,18 @@
package stream.dto;
import java.io.Serial;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class EventLogFilter implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
public Long fromBlock; // Starting block number (null means latest)
public Long toBlock; // Ending block number (null means latest)
public List<String> addresses = new ArrayList<>(); // Contract addresses to filter
public List<String> topics = new ArrayList<>(); // Event topics to filter (can include the event signature)
public Boolean includeUnfinalized = true; // Whether to include logs from unfinalized blocks
}

View File

@@ -0,0 +1,13 @@
package stream.dto;
import java.io.Serializable;
public class EventType<T extends EventLog> implements Serializable {
public String signature;
public Class<T> eventLogClass;
public EventType(String signature, Class<T> eventLogClass) {
this.signature = signature;
this.eventLogClass = eventLogClass;
}
}

View File

@@ -0,0 +1,20 @@
package stream.dto;
import com.fasterxml.jackson.annotation.JacksonAnnotationsInside;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import stream.io.HexLongDeserializer;
import stream.io.HexLongSerializer;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.FIELD, ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
@JacksonAnnotationsInside
@JsonSerialize(using = HexLongSerializer.class)
@JsonDeserialize(using = HexLongDeserializer.class)
public @interface HexLong {
}

View File

@@ -0,0 +1,41 @@
package stream.dto;
import java.io.Serializable;
import java.math.BigInteger;
/*
/// @notice Emitted when liquidity is minted for a given position
/// @param sender The address that minted the liquidity
/// @param owner The owner of the position and recipient of any minted liquidity
/// @param tickLower The lower tick of the position
/// @param tickUpper The upper tick of the position
/// @param amount The amount of liquidity minted to the position range
/// @param amount0 How much token0 was required for the minted liquidity
/// @param amount1 How much token1 was required for the minted liquidity
event Mint(
address sender,
address indexed owner,
int24 indexed tickLower,
int24 indexed tickUpper,
uint128 amount,
uint256 amount0,
uint256 amount1
);
*/
public class MintEventLog extends EventLog implements Serializable {
public static final String SIGNATURE = "Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)";
public String sender;
public String owner;
public int tickLower;
public int tickUpper;
@BigInt
public BigInteger amount;
@BigInt
public BigInteger amount0;
@BigInt
public BigInteger amount1;
}

View File

@@ -1,12 +1,18 @@
package stream.dto; package stream.dto;
import java.io.Serial;
import java.io.Serializable;
import java.util.List; import java.util.List;
public class Transaction { public class Transaction implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
public String hash; public String hash;
public String nonce; public String nonce;
public String blockHash; public String blockHash;
@HexLong
public Long blockNumber; public Long blockNumber;
@HexLong
public Long transactionIndex; public Long transactionIndex;
public String from; public String from;
public String to; public String to;
@@ -20,11 +26,11 @@ public class Transaction {
public String creates; // contract creation address if this tx created a contract public String creates; // contract creation address if this tx created a contract
public String chainId; public String chainId;
public String type; // transaction type (0 = legacy, 1 = EIP-2930, 2 = EIP-1559) public String type; // transaction type (0 = legacy, 1 = EIP-2930, 2 = EIP-1559)
// EIP-1559 specific fields // EIP-1559 specific fields
public String maxFeePerGas; public String maxFeePerGas;
public String maxPriorityFeePerGas; public String maxPriorityFeePerGas;
// Receipt fields often included with transaction // Receipt fields often included with transaction
public Long cumulativeGasUsed; public Long cumulativeGasUsed;
public Long gasUsed; public Long gasUsed;

View File

@@ -0,0 +1,25 @@
package stream.io;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import java.io.IOException;
import java.math.BigInteger;
public class BigIntDeserializer extends JsonDeserializer<BigInteger> {
@Override
public BigInteger deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
String value = p.getValueAsString();
if (value == null || value.isEmpty()) {
return null;
}
// Remove "0x" prefix if present
if (value.startsWith("0x")) {
value = value.substring(2);
}
return new BigInteger(value, 16);
}
}

View File

@@ -0,0 +1,19 @@
package stream.io;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
import java.math.BigInteger;
public class BigIntSerializer extends JsonSerializer<BigInteger> {
@Override
public void serialize(BigInteger value, JsonGenerator gen, SerializerProvider provider) throws IOException {
if (value == null) {
gen.writeNull();
} else {
gen.writeString("0x" + value.toString(16));
}
}
}

View File

@@ -0,0 +1,40 @@
package stream.io;
import org.bouncycastle.jcajce.provider.digest.Keccak;
import org.bouncycastle.pqc.math.linearalgebra.ByteUtils;
public class EthUtils {
public static boolean isValidAddress(String address) {
if (!address.matches("^0x[0-9a-fA-F]{40}$")) {
return false;
}
// For non-checksum addresses, just verify hex format
if (address.equals(address.toLowerCase()) || address.equals(address.toUpperCase())) {
return true;
}
// For checksum addresses, verify the checksum
return address.equals(toChecksumAddress(address.toLowerCase()));
}
public static String toChecksumAddress(String address) {
address = address.toLowerCase().replace("0x", "");
String hashHex = keccak(address);
StringBuilder result = new StringBuilder("0x");
for (int i = 0; i < address.length(); i++) {
if (Integer.parseInt(String.valueOf(hashHex.charAt(i)), 16) >= 8) {
result.append(String.valueOf(address.charAt(i)).toUpperCase());
} else {
result.append(address.charAt(i));
}
}
return result.toString();
}
public static String keccak(String message) {
byte[] hash = new Keccak.Digest256().digest(message.getBytes());
return "0x"+ByteUtils.toHexString(hash);
}
}

View File

@@ -46,6 +46,16 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient {
subscribe(new Object[]{"logs", filterParams}, callback); subscribe(new Object[]{"logs", filterParams}, callback);
} }
/**
* Subscribe to logs with a more detailed filter.
*
* @param filter The filter to apply to the logs
* @param callback The callback to invoke when a log is received
*/
public void subscribeLogFilter(ObjectNode filter, Consumer<JsonNode> callback) {
subscribe(new Object[]{"logs", filter}, callback);
}
private void subscribe(Object params, Consumer<JsonNode> callback) { private void subscribe(Object params, Consumer<JsonNode> callback) {
Subscription subscription = new Subscription("eth_subscribe", params, callback); Subscription subscription = new Subscription("eth_subscribe", params, callback);
String requestId = sendRpc("eth_subscribe", params); String requestId = sendRpc("eth_subscribe", params);
@@ -59,7 +69,7 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient {
super.onMessage(message); super.onMessage(message);
try { try {
JsonNode node = mapper.readTree(message); JsonNode node = mapper.readTree(message);
// Handle subscription response // Handle subscription response
if (node.has("id")) { if (node.has("id")) {
String id = node.get("id").asText(); String id = node.get("id").asText();
@@ -70,7 +80,7 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient {
activeSubscriptions.put(subscriptionId, subscription); activeSubscriptions.put(subscriptionId, subscription);
} }
} }
// Handle subscription notification // Handle subscription notification
if (node.has("method") && "eth_subscription".equals(node.get("method").asText())) { if (node.has("method") && "eth_subscription".equals(node.get("method").asText())) {
JsonNode params = node.get("params"); JsonNode params = node.get("params");
@@ -83,7 +93,7 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient {
} catch (IOException e) { } catch (IOException e) {
logger.error("Error processing message", e); logger.error("Error processing message", e);
} }
} }
@Override @Override
@@ -105,4 +115,4 @@ public class EthereumWebSocketClient extends WebSocketJsonRpcClient {
activeSubscriptions.remove(subscriptionId); activeSubscriptions.remove(subscriptionId);
} }
} }
} }

View File

@@ -0,0 +1,195 @@
package stream.source.eventlog;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.ElaboratedEventType;
import stream.dto.EventLog;
import stream.dto.EventLogFilter;
import stream.dto.EventType;
import java.io.IOException;
import java.net.URI;
import java.util.List;
/**
* A Flink Source that emits event logs from the Ethereum blockchain by subscribing to log events.
*/
public class EventLogSource<T extends EventLog> implements Source<T, EventLogSource.EventLogSplit, EventLogSource.EventLogEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(EventLogSource.class);
private final URI websocketUri;
private final EventLogFilter filter;
private final EventType<T> eventType;
/**
* Creates a new EventLogSource.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param filter The filter to apply to the event logs
*/
public EventLogSource(URI websocketUri, EventLogFilter filter, EventType<T> eventType) {
this.websocketUri = websocketUri;
this.filter = filter;
this.eventType = eventType;
}
@Override
public Boundedness getBoundedness() {
// This source is unbounded as it continuously receives new event logs
return Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SourceReader<T, EventLogSplit> createReader(SourceReaderContext readerContext) {
return new EventLogSourceReader<T>(websocketUri, filter, readerContext, eventType);
}
@Override
public SplitEnumerator<EventLogSplit, EventLogEnumeratorState> createEnumerator(SplitEnumeratorContext<EventLogSplit> enumContext) {
return new EventLogSplitEnumerator(enumContext);
}
@Override
public SplitEnumerator<EventLogSplit, EventLogEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<EventLogSplit> enumContext, EventLogEnumeratorState state) {
return new EventLogSplitEnumerator(enumContext);
}
@Override
public SimpleVersionedSerializer<EventLogSplit> getSplitSerializer() {
return new EventLogSplitSerializer();
}
@Override
public SimpleVersionedSerializer<EventLogEnumeratorState> getEnumeratorCheckpointSerializer() {
return new EventLogEnumeratorStateSerializer();
}
/**
* A simple split for the EventLogSource. Since we're just subscribing to a stream of events,
* we don't need to partition the source, so we use a single dummy split.
*/
public static class EventLogSplit implements SourceSplit {
private static final String SPLIT_ID = "eventlog-split";
@Override
public String splitId() {
return SPLIT_ID;
}
}
/**
* A simple state for the EventLogSource enumerator. Since we're just subscribing to a stream of events,
* we don't need to track any state, so this is just a placeholder.
*/
public static class EventLogEnumeratorState {
// No state needed
}
/**
* A simple serializer for EventLogSplit.
*/
private static class EventLogSplitSerializer implements SimpleVersionedSerializer<EventLogSplit> {
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(EventLogSplit split) {
// Since our split is just a dummy, we can return an empty byte array
return new byte[0];
}
@Override
public EventLogSplit deserialize(int version, byte[] serialized) {
// Since our split is just a dummy, we can create a new instance
return new EventLogSplit();
}
}
/**
* A simple serializer for EventLogEnumeratorState.
*/
private static class EventLogEnumeratorStateSerializer implements SimpleVersionedSerializer<EventLogEnumeratorState> {
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(EventLogEnumeratorState state) {
// Since our state is just a dummy, we can return an empty byte array
return new byte[0];
}
@Override
public EventLogEnumeratorState deserialize(int version, byte[] serialized) {
// Since our state is just a dummy, we can create a new instance
return new EventLogEnumeratorState();
}
}
/**
* A simple enumerator for the EventLogSource. Since we're just subscribing to a stream of events,
* we don't need to partition the source, so we just assign a single dummy split to the first reader.
*/
private static class EventLogSplitEnumerator implements SplitEnumerator<EventLogSplit, EventLogEnumeratorState> {
private final SplitEnumeratorContext<EventLogSplit> context;
private boolean splitAssigned = false;
public EventLogSplitEnumerator(SplitEnumeratorContext<EventLogSplit> context) {
this.context = context;
}
@Override
public void start() {
// If we have any readers, assign the split
if (context.registeredReaders().size() > 0) {
assignSplit();
}
}
@Override
public void handleSplitRequest(int subtaskId, String requesterHostname) {
// Ignore split requests, we assign splits proactively
}
@Override
public void addSplitsBack(List<EventLogSplit> splits, int subtaskId) {
// If a reader failed and we get splits back, we'll reassign them when a new reader registers
splitAssigned = false;
}
@Override
public void addReader(int subtaskId) {
// When a new reader registers, assign the split if it hasn't been assigned yet
if (!splitAssigned) {
assignSplit();
}
}
private void assignSplit() {
if (!context.registeredReaders().isEmpty()) {
// Assign the split to the first reader
int firstReader = context.registeredReaders().keySet().iterator().next();
context.assignSplit(new EventLogSplit(), firstReader);
splitAssigned = true;
LOG.info("Assigned EventLogSplit to reader {}", firstReader);
}
}
@Override
public EventLogEnumeratorState snapshotState(long checkpointId) throws Exception {
return new EventLogEnumeratorState();
}
@Override
public void close() throws IOException {
// No resources to clean up
}
}
}

View File

@@ -0,0 +1,33 @@
package stream.source.eventlog;
import org.apache.flink.api.connector.source.Source;
import stream.dto.ElaboratedEventType;
import stream.dto.EventLog;
import stream.dto.EventLogFilter;
import stream.dto.EventType;
import java.net.URI;
/**
* A factory class for creating EventLogSource and EventLogSubscriber instances.
*/
public class EventLogSourceFactory {
/**
* Creates a new EventLogSource for the given websocket URI and filter.
* This source will emit event logs from the Ethereum blockchain by subscribing to log events.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param filter The filter to apply to the event logs
* @return A new EventLogSource
*/
public static <T extends EventLog> Source<T, ?, ?> createSource(URI websocketUri, EventType<T> eventType) {
EventLogFilter filter = new EventLogFilter();
filter.topics.add(new ElaboratedEventType<T>(eventType).hash);
return createSource(websocketUri, eventType, filter);
}
public static <T extends EventLog> Source<T, ?, ?> createSource(URI websocketUri, EventType<T> eventType, EventLogFilter filter) {
return new EventLogSource<>(websocketUri, filter, eventType);
}
}

View File

@@ -0,0 +1,127 @@
package stream.source.eventlog;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.ElaboratedEventType;
import stream.dto.EventLog;
import stream.dto.EventLogFilter;
import stream.dto.EventType;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* A Flink SourceReader that reads event logs from the Ethereum blockchain by subscribing to log events.
*/
public class EventLogSourceReader<T extends EventLog> implements SourceReader<T, EventLogSource.EventLogSplit> {
private static final Logger LOG = LoggerFactory.getLogger(EventLogSourceReader.class);
private final URI websocketUri;
private final EventLogFilter filter;
private final List<EventLogSource.EventLogSplit> assignedSplits;
private final EventType<T> eventType;
private EventLogSubscriber<T> subscriber;
/**
* Creates a new EventLogSourceReader.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param filter The filter to apply to the event logs
* @param context The source reader context
*/
public EventLogSourceReader(URI websocketUri, EventLogFilter filter, @SuppressWarnings("unused") SourceReaderContext context, EventType<T> eventType) {
this.websocketUri = websocketUri;
this.filter = filter;
this.assignedSplits = new ArrayList<>();
this.eventType = eventType;
}
@Override
public void start() {
// We don't start the subscriber here, we wait until we get a split
}
@SuppressWarnings("RedundantThrows")
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
// If we haven't been assigned a split yet, we can't do anything
if (assignedSplits.isEmpty()) {
return InputStatus.NOTHING_AVAILABLE;
}
// If we haven't created the subscriber yet, create it now
if (subscriber == null) {
try {
subscriber = new EventLogSubscriber<T>(websocketUri, filter, eventType);
subscriber.connect();
LOG.info("Connected to Ethereum node at {} with filter: {}", websocketUri, filter);
} catch (Exception e) {
LOG.error("Error connecting to Ethereum node", e);
return InputStatus.NOTHING_AVAILABLE;
}
}
// Try to get the next event log
T eventLog = subscriber.getNextLogNonBlocking();
if (eventLog != null) {
// We got an event log, emit it
output.collect(eventLog);
LOG.debug("Emitted event log: {}", eventLog);
return InputStatus.MORE_AVAILABLE;
} else {
// No event log available right now
return InputStatus.NOTHING_AVAILABLE;
}
}
@Override
public CompletableFuture<Void> isAvailable() {
// If we have a subscriber and it's running, we might get more data
if (subscriber != null && subscriber.isRunning()) {
return CompletableFuture.completedFuture(null);
} else {
return CompletableFuture.supplyAsync(() -> {
try {
// Wait a bit before checking again
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
});
}
}
@Override
public List<EventLogSource.EventLogSplit> snapshotState(long checkpointId) {
// We don't need to snapshot any state, just return the assigned splits
return Collections.unmodifiableList(assignedSplits);
}
@Override
public void addSplits(List<EventLogSource.EventLogSplit> splits) {
LOG.info("Adding {} splits", splits.size());
assignedSplits.addAll(splits);
}
@Override
public void notifyNoMoreSplits() {
LOG.info("No more splits will be assigned");
}
@Override
public void close() throws Exception {
if (subscriber != null) {
subscriber.close();
}
}
}

View File

@@ -0,0 +1,170 @@
package stream.source.eventlog;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.*;
import stream.io.EthereumWebSocketClient;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A utility class that subscribes to Ethereum event logs based on a filter and provides methods to get the logs.
* This class is not a Flink source itself, but can be used by a custom Flink source implementation.
*/
public class EventLogSubscriber<T extends EventLog> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(EventLogSubscriber.class);
private final URI websocketUri;
private final EventLogFilter filter;
private final BlockingQueue<T> logQueue;
private final AtomicBoolean running;
private final ElaboratedEventType<T> eventType;
private EthereumWebSocketClient client;
/**
* Creates a new EventLogSubscriber.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param filter The filter to apply to the event logs
*/
public EventLogSubscriber(URI websocketUri, EventLogFilter filter, EventType<T> eventType) {
this.websocketUri = websocketUri;
this.filter = filter;
this.logQueue = new LinkedBlockingQueue<>();
this.running = new AtomicBoolean(true);
this.eventType = new ElaboratedEventType<>(eventType);
}
/**
* Connects to the Ethereum node and subscribes to event logs based on the filter.
*
* @throws Exception If an error occurs while connecting or subscribing
*/
public void connect() throws Exception {
// Create and connect the client
client = new EthereumWebSocketClient(websocketUri);
client.connectBlocking();
// Create filter parameters
ObjectNode filterParams = mapper.createObjectNode();
// Add fromBlock and toBlock if specified
if (filter.fromBlock != null) {
filterParams.put("fromBlock", "0x" + Long.toHexString(filter.fromBlock));
}
if (filter.toBlock != null) {
filterParams.put("toBlock", "0x" + Long.toHexString(filter.toBlock));
}
// Add addresses if specified
if (filter.addresses != null && !filter.addresses.isEmpty()) {
filterParams.set("address", mapper.valueToTree(filter.addresses));
}
// Add topics if specified
if (filter.topics != null && !filter.topics.isEmpty()) {
filterParams.set("topics", mapper.valueToTree(filter.topics));
}
// Subscribe to logs with the filter using the new method that supports the full filter
// Add includeUnfinalized if specified
if (filter.includeUnfinalized != null) {
filterParams.put("includeUnfinalized", filter.includeUnfinalized);
}
client.subscribeLogFilter(filterParams, this::processEventLog);
LOG.info("Subscribed to event logs from {} with filter: {}", websocketUri, filter);
}
/**
* Processes an event log received from the Ethereum node.
*
* @param jsonNode The JSON representation of the event log
*/
private void processEventLog(JsonNode jsonNode) {
try {
// Convert the JSON to an EventLog
var data = jsonNode.get("data").asText();
var topicNodes = jsonNode.get("topics").elements();
List<String> topics = new ArrayList<>();
while (topicNodes.hasNext())
topics.add(topicNodes.next().asText());
if (!topics.get(0).equals(eventType.hash)) {
LOG.warn("Event signature mismatch. Expected: {}, Got: {}", eventType.hash, topics.get(0));
return;
}
var argData = mapper.createObjectNode();
var args = eventType.parseEventData(topics, data);
var entries = new Map.Entry[eventType.paramNames.size()];
for (int i = 0; i < entries.length; i++)
argData.set(eventType.paramNames.get(i), mapper.valueToTree(args.get(i)));
@SuppressWarnings("unchecked") T eventLog = (T) this.eventType.eventLogClass.getDeclaredConstructor().newInstance();
ObjectReader updater = mapper.readerForUpdating(eventLog);
updater.readValue(jsonNode);
updater.readValue(argData);
eventLog.args = args;
eventLog.sender = topics.get(0);
logQueue.add(eventLog);
LOG.debug("Received event log: {}", eventLog);
} catch (Exception e) {
LOG.error("Error processing event log", e);
}
}
private static final ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
/**
* Gets the next event log from the queue, blocking if necessary until a log is available.
*
* @return The next event log
* @throws InterruptedException If the thread is interrupted while waiting
*/
public T getNextLog() throws InterruptedException {
return logQueue.take();
}
/**
* Gets the next event log from the queue, returning null if no log is available.
*
* @return The next event log, or null if no log is available
*/
public T getNextLogNonBlocking() {
return logQueue.poll();
}
/**
* Checks if the subscriber is running.
*
* @return True if the subscriber is running, false otherwise
*/
public boolean isRunning() {
return running.get();
}
@Override
public void close() throws Exception {
// Set running to false to signal that we're done
running.set(false);
// Close the client if it exists
if (client != null) {
client.close();
}
}
}

View File

@@ -0,0 +1,470 @@
package stream.uniswap3;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import stream.dto.AddressId;
import stream.dto.BurnEventLog;
import stream.dto.EventLog;
import stream.dto.MintEventLog;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
public class OptimizedLiquidityTracker<T extends EventLog> extends KeyedProcessFunction<AddressId, T, T> {
// Uniswap V3 tick bounds
private static final int MIN_TICK = -887272; // at feeTier 0.01%
private static final int MAX_TICK = 887272;
private static final int TICK_RANGE = MAX_TICK - MIN_TICK + 1;
// Using BitSet for compact storage of which ticks are populated
private static final int BITS_PER_WORD = 64; // Since we'll use long[]
private static final int NUM_WORDS = (TICK_RANGE + BITS_PER_WORD - 1) / BITS_PER_WORD;
private MapState<Integer, BigInteger> liquidityNetByTick;
private ValueState<BigInteger> currentLiquidity;
private ValueState<Integer> currentTick;
private ValueState<long[]> tickBitmap; // Using long[] for the bitmap
@Override
public void open(OpenContext openContext) throws Exception {
MapStateDescriptor<Integer, BigInteger> mapDescriptor =
new MapStateDescriptor<>(
"liquidity-net-by-tick",
Types.INT,
TypeInformation.of(new TypeHint<BigInteger>() {})
);
ValueStateDescriptor<BigInteger> liquidityDescriptor =
new ValueStateDescriptor<>(
"current-liquidity",
TypeInformation.of(new TypeHint<BigInteger>() {})
);
ValueStateDescriptor<Integer> tickDescriptor =
new ValueStateDescriptor<>(
"current-tick",
Types.INT
);
ValueStateDescriptor<long[]> bitmapDescriptor =
new ValueStateDescriptor<>(
"tick-bitmap",
TypeInformation.of(new TypeHint<long[]>() {})
);
liquidityNetByTick = getRuntimeContext().getMapState(mapDescriptor);
currentLiquidity = getRuntimeContext().getState(liquidityDescriptor);
currentTick = getRuntimeContext().getState(tickDescriptor);
tickBitmap = getRuntimeContext().getState(bitmapDescriptor);
}
@Override
public void processElement(T event, KeyedProcessFunction<AddressId, T, T>.Context context, Collector<T> out) throws Exception {
// Initialize if needed
if (currentLiquidity.value() == null) {
currentLiquidity.update(BigInteger.ZERO);
currentTick.update(0);
tickBitmap.update(new long[NUM_WORDS]);
}
if (event instanceof MintEventLog m) {
mint(m.tickLower, m.tickUpper, m.amount);
}
else if (event instanceof BurnEventLog b) {
burn(b.tickLower, b.tickUpper, b.amount);
}
out.collect(event);
}
/**
* Adds liquidity in a tick range [tickLower, tickUpper)
* @param tickLower The lower tick (inclusive)
* @param tickUpper The upper tick (exclusive)
* @param amount The amount of liquidity to add
*/
public void mint(int tickLower, int tickUpper, BigInteger amount) throws Exception {
validateStakeParams(tickLower, tickUpper, amount);
// Add liquidity at lower tick boundary (entering the position)
updateLiquidityNetAtTick(tickLower, amount);
// Remove liquidity at upper tick boundary (exiting the position)
updateLiquidityNetAtTick(tickUpper, amount.negate());
// If current tick is within range, update current liquidity
int currentTickValue = currentTick.value();
if (currentTickValue >= tickLower && currentTickValue < tickUpper) {
BigInteger newLiquidity = currentLiquidity.value().add(amount);
currentLiquidity.update(newLiquidity);
}
}
/**
* Removes liquidity from a tick range [tickLower, tickUpper)
* @param tickLower The lower tick (inclusive)
* @param tickUpper The upper tick (exclusive)
* @param amount The amount of liquidity to remove
*/
public void burn(int tickLower, int tickUpper, BigInteger amount) throws Exception {
validateStakeParams(tickLower, tickUpper, amount);
// Remove liquidity at lower tick boundary (reverse of mint)
updateLiquidityNetAtTick(tickLower, amount.negate());
// Add liquidity at upper tick boundary (reverse of mint)
updateLiquidityNetAtTick(tickUpper, amount);
// If current tick is within range, update current liquidity
int currentTickValue = currentTick.value();
if (currentTickValue >= tickLower && currentTickValue < tickUpper) {
BigInteger currentLiquidityValue = currentLiquidity.value();
if (currentLiquidityValue.compareTo(amount) < 0) {
throw new IllegalStateException("Insufficient liquidity to burn");
}
BigInteger newLiquidity = currentLiquidityValue.subtract(amount);
currentLiquidity.update(newLiquidity);
}
}
private void validateStakeParams(int tickLower, int tickUpper, BigInteger amount) {
if (tickLower >= tickUpper) {
throw new IllegalArgumentException("tickLower must be less than tickUpper");
}
if (tickLower < MIN_TICK || tickUpper > MAX_TICK) {
throw new IllegalArgumentException("Tick range outside bounds");
}
if (amount.compareTo(BigInteger.ZERO) <= 0) {
throw new IllegalArgumentException("Amount must be positive");
}
}
private void updateLiquidityNetAtTick(int tick, BigInteger liquidityDelta) throws Exception {
if (tick < MIN_TICK || tick > MAX_TICK) {
throw new IllegalArgumentException("Tick outside allowed range");
}
BigInteger currentNet = liquidityNetByTick.contains(tick)
? liquidityNetByTick.get(tick)
: BigInteger.ZERO;
BigInteger newNet = currentNet.add(liquidityDelta);
// Update bitmap
long[] bitmap = tickBitmap.value();
int normalizedTick = tick - MIN_TICK;
int wordIndex = normalizedTick / BITS_PER_WORD;
int bitIndex = normalizedTick % BITS_PER_WORD;
if (newNet.equals(BigInteger.ZERO)) {
liquidityNetByTick.remove(tick);
// Clear bit
bitmap[wordIndex] &= ~(1L << bitIndex);
} else {
liquidityNetByTick.put(tick, newNet);
// Set bit
bitmap[wordIndex] |= (1L << bitIndex);
}
tickBitmap.update(bitmap);
}
private void crossTick(int newTick) throws Exception {
int oldTick = currentTick.value();
if (newTick == oldTick) {
return;
}
BigInteger liquidityValue = currentLiquidity.value();
long[] bitmap = tickBitmap.value();
if (newTick > oldTick) {
// Search forward
for (int tick = oldTick + 1; tick <= newTick; tick++) {
if (tick < MIN_TICK || tick > MAX_TICK) continue;
int normalizedTick = tick - MIN_TICK;
int wordIndex = normalizedTick / BITS_PER_WORD;
int bitIndex = normalizedTick % BITS_PER_WORD;
// Check if this tick has liquidity
if ((bitmap[wordIndex] & (1L << bitIndex)) != 0) {
liquidityValue = liquidityValue.add(liquidityNetByTick.get(tick));
}
}
} else {
// Search backward
for (int tick = oldTick; tick > newTick; tick--) {
if (tick < MIN_TICK || tick > MAX_TICK) continue;
int normalizedTick = tick - MIN_TICK;
int wordIndex = normalizedTick / BITS_PER_WORD;
int bitIndex = normalizedTick % BITS_PER_WORD;
// Check if this tick has liquidity
if ((bitmap[wordIndex] & (1L << bitIndex)) != 0) {
liquidityValue = liquidityValue.subtract(liquidityNetByTick.get(tick));
}
}
}
currentLiquidity.update(liquidityValue);
currentTick.update(newTick);
}
// Helper method to find next initialized tick
private int nextInitializedTick(int tick, boolean lte) throws Exception {
if (tick < MIN_TICK || tick > MAX_TICK) {
return lte ? MIN_TICK : MAX_TICK;
}
long[] bitmap = tickBitmap.value();
int normalizedTick = tick - MIN_TICK;
int wordIndex = normalizedTick / BITS_PER_WORD;
int bitIndex = normalizedTick % BITS_PER_WORD;
if (lte) {
// Search backwards
// Check current word
long mask = (1L << bitIndex) - 1;
long bits = bitmap[wordIndex] & mask;
if (bits != 0) {
return MIN_TICK + (wordIndex * BITS_PER_WORD) + (63 - Long.numberOfLeadingZeros(bits));
}
// Check previous words
for (int i = wordIndex - 1; i >= 0; i--) {
if (bitmap[i] != 0) {
return MIN_TICK + (i * BITS_PER_WORD) + (63 - Long.numberOfLeadingZeros(bitmap[i]));
}
}
return MIN_TICK;
} else {
// Search forwards
// Check current word
long mask = ~((1L << bitIndex) - 1);
long bits = bitmap[wordIndex] & mask;
if (bits != 0) {
return MIN_TICK + (wordIndex * BITS_PER_WORD) + Long.numberOfTrailingZeros(bits);
}
// Check following words
for (int i = wordIndex + 1; i < NUM_WORDS; i++) {
if (bitmap[i] != 0) {
return MIN_TICK + (i * BITS_PER_WORD) + Long.numberOfTrailingZeros(bitmap[i]);
}
}
return MAX_TICK;
}
}
/**
* Calculate the total liquidity between two ticks without modifying state
* @param fromTick The starting tick (inclusive)
* @param toTick The ending tick (exclusive)
* @return The net liquidity that would be encountered moving from fromTick to toTick
*/
public BigInteger calculateLiquidityInRange(int fromTick, int toTick) throws Exception {
if (fromTick >= toTick) {
throw new IllegalArgumentException("fromTick must be less than toTick");
}
if (fromTick < MIN_TICK || toTick > MAX_TICK) {
throw new IllegalArgumentException("Tick range outside bounds");
}
BigInteger liquidityDelta = BigInteger.ZERO;
long[] bitmap = tickBitmap.value();
// Determine direction and adjust loop accordingly
int startTick = fromTick;
int endTick = toTick;
// Convert to bitmap indices
int startNormalizedTick = startTick - MIN_TICK;
int endNormalizedTick = endTick - MIN_TICK;
int startWord = startNormalizedTick / BITS_PER_WORD;
int endWord = endNormalizedTick / BITS_PER_WORD;
// Optimize by checking whole words when possible
for (int wordIndex = startWord; wordIndex <= endWord; wordIndex++) {
long word = bitmap[wordIndex];
if (word == 0) {
continue; // Skip empty words
}
// Calculate bit range for this word
int wordStartBit = (wordIndex == startWord) ?
startNormalizedTick % BITS_PER_WORD : 0;
int wordEndBit = (wordIndex == endWord) ?
endNormalizedTick % BITS_PER_WORD : BITS_PER_WORD;
// Create mask for relevant bits in this word
long mask = -1L >>> (BITS_PER_WORD - (wordEndBit - wordStartBit));
mask = mask << wordStartBit;
long relevantBits = word & mask;
// Process each set bit in the word
while (relevantBits != 0) {
int bitIndex = Long.numberOfTrailingZeros(relevantBits);
int tick = MIN_TICK + (wordIndex * BITS_PER_WORD) + bitIndex;
if (tick >= fromTick && tick < toTick) {
BigInteger liquidityAtTick = liquidityNetByTick.get(tick);
if (liquidityAtTick != null) {
liquidityDelta = liquidityDelta.add(liquidityAtTick);
}
}
// Clear the processed bit
relevantBits &= ~(1L << bitIndex);
}
}
return liquidityDelta;
}
/**
* Overloaded version that returns both the net liquidity change and the absolute liquidity
* at each point in the range
* @param fromTick The starting tick (inclusive)
* @param toTick The ending tick (exclusive)
* @return A list of liquidity points
*/
public Pair<BigInteger, List<LiquidityPoint>> calculateLiquidityInRangeDetailed(
int fromTick,
int toTick
) throws Exception {
if (fromTick >= toTick) {
throw new IllegalArgumentException("fromTick must be less than toTick");
}
if (fromTick < MIN_TICK || toTick > MAX_TICK) {
throw new IllegalArgumentException("Tick range outside bounds");
}
List<LiquidityPoint> points = new ArrayList<>();
BigInteger runningLiquidity = BigInteger.ZERO;
BigInteger netChange = BigInteger.ZERO;
long[] bitmap = tickBitmap.value();
int startNormalizedTick = fromTick - MIN_TICK;
int endNormalizedTick = toTick - MIN_TICK;
int startWord = startNormalizedTick / BITS_PER_WORD;
int endWord = endNormalizedTick / BITS_PER_WORD;
for (int wordIndex = startWord; wordIndex <= endWord; wordIndex++) {
long word = bitmap[wordIndex];
if (word == 0) {
continue;
}
int wordStartBit = (wordIndex == startWord) ?
startNormalizedTick % BITS_PER_WORD : 0;
int wordEndBit = (wordIndex == endWord) ?
endNormalizedTick % BITS_PER_WORD : BITS_PER_WORD;
long mask = -1L >>> (BITS_PER_WORD - (wordEndBit - wordStartBit));
mask = mask << wordStartBit;
long relevantBits = word & mask;
while (relevantBits != 0) {
int bitIndex = Long.numberOfTrailingZeros(relevantBits);
int tick = MIN_TICK + (wordIndex * BITS_PER_WORD) + bitIndex;
if (tick >= fromTick && tick < toTick) {
BigInteger liquidityAtTick = liquidityNetByTick.get(tick);
if (liquidityAtTick != null) {
netChange = netChange.add(liquidityAtTick);
runningLiquidity = runningLiquidity.add(liquidityAtTick);
points.add(new LiquidityPoint(tick, liquidityAtTick, runningLiquidity));
}
}
relevantBits &= ~(1L << bitIndex);
}
}
return Pair.of(netChange, points);
}
/**
* Returns a list of liquidities for each tick in the given range.
* @param fromTick The starting tick (inclusive)
* @param toTick The ending tick (exclusive)
* @return List of liquidity amounts, where index 0 corresponds to fromTick
*/
public List<BigInteger> getLiquidityRange(int fromTick, int toTick) throws Exception {
if (fromTick >= toTick) {
throw new IllegalArgumentException("fromTick must be less than toTick");
}
if (fromTick < MIN_TICK || toTick > MAX_TICK) {
throw new IllegalArgumentException("Tick range outside bounds");
}
int size = toTick - fromTick;
List<BigInteger> liquidities = new ArrayList<>(size);
BigInteger runningLiquidity = BigInteger.ZERO;
// Initialize all positions with zero
for (int i = 0; i < size; i++) {
liquidities.add(BigInteger.ZERO);
}
// Use bitmap to efficiently find populated ticks
long[] bitmap = tickBitmap.value();
int startNormalizedTick = fromTick - MIN_TICK;
int endNormalizedTick = toTick - MIN_TICK;
int startWord = startNormalizedTick / BITS_PER_WORD;
int endWord = endNormalizedTick / BITS_PER_WORD;
// Process each word in the bitmap
for (int wordIndex = startWord; wordIndex <= endWord; wordIndex++) {
long word = bitmap[wordIndex];
if (word == 0) {
continue;
}
int wordStartBit = (wordIndex == startWord) ?
startNormalizedTick % BITS_PER_WORD : 0;
int wordEndBit = (wordIndex == endWord) ?
endNormalizedTick % BITS_PER_WORD : BITS_PER_WORD;
long mask = -1L >>> (BITS_PER_WORD - (wordEndBit - wordStartBit));
mask = mask << wordStartBit;
long relevantBits = word & mask;
while (relevantBits != 0) {
int bitIndex = Long.numberOfTrailingZeros(relevantBits);
int tick = MIN_TICK + (wordIndex * BITS_PER_WORD) + bitIndex;
if (tick >= fromTick && tick < toTick) {
BigInteger liquidityAtTick = liquidityNetByTick.get(tick);
if (liquidityAtTick != null) {
runningLiquidity = runningLiquidity.add(liquidityAtTick);
// Update all subsequent positions with the new running liquidity
for (int i = tick - fromTick; i < size; i++) {
liquidities.set(i, runningLiquidity);
}
}
}
relevantBits &= ~(1L << bitIndex);
}
}
return liquidities;
}
/**
* Helper class to represent liquidity at a specific tick
*/
public record LiquidityPoint(int tick, BigInteger liquidityDelta, BigInteger liquidityAfter) {}
}

View File

@@ -0,0 +1,69 @@
# TickLiquidityTracker
## Overview
The `TickLiquidityTracker` class replicates Uniswap v3's tick-based liquidity tracking system. In Uniswap v3, liquidity is concentrated within specific price ranges defined by ticks, rather than being distributed across the entire price curve as in previous versions.
## How Uniswap v3 Tracks Liquidity
Uniswap v3 uses a clever data structure to track liquidity:
1. **Tick-Based Liquidity**: Each tick represents a specific price point. Liquidity providers can add liquidity within a range defined by a lower and upper tick.
2. **Net Liquidity Changes**: At each tick boundary, Uniswap records the net change in liquidity that occurs when crossing that tick. This is stored as `liquidityNet` in the Uniswap v3 contracts.
3. **Global Liquidity Tracking**: The protocol maintains a global `liquidity` variable that represents the current active liquidity at the current tick.
4. **Efficient Range Queries**: To determine the liquidity at any given tick, Uniswap doesn't need to iterate through all positions. Instead, it can calculate it by starting with the global liquidity and adding/subtracting the net liquidity changes when crossing ticks.
## Implementation Details
The `TickLiquidityTracker` class implements this system with the following components:
1. **Data Structures**:
- `TreeMap<Integer, BigInteger> liquidityNetByTick`: Stores the net liquidity change at each tick
- `BigInteger liquidity`: Tracks the current global liquidity
- `int currentTick`: Tracks the current tick
2. **Key Methods**:
- `updateLiquidityOnMint(int tickLower, int tickUpper, BigInteger amount)`: Updates liquidity when a new position is created
- `updateLiquidityOnBurn(int tickLower, int tickUpper, BigInteger amount)`: Updates liquidity when a position is removed
- `crossTick(int newTick)`: Updates the current tick and global liquidity when crossing ticks
- `getLiquidityAtTick(int tick)`: Calculates the liquidity at a specific tick without changing the current state
## Usage Example
```java
// Create a new tracker
TickLiquidityTracker tracker = new TickLiquidityTracker();
// Add a position from tick 10 to 20 with 500 liquidity
tracker.updateLiquidityOnMint(10, 20, BigInteger.valueOf(500));
// Move to tick 15 (inside the position range)
tracker.crossTick(15);
// Get current liquidity
BigInteger currentLiquidity = tracker.getCurrentLiquidity();
// Calculate liquidity at a different tick without moving
BigInteger liquidityAtTick12 = tracker.getLiquidityAtTick(12);
```
For a more comprehensive demonstration, see the `TickLiquidityTrackerDemo` class.
## Performance Considerations
- The implementation uses a `TreeMap` to efficiently store and query tick data
- Crossing ticks has O(k) complexity where k is the number of ticks crossed
- Calculating liquidity at a specific tick has O(k) complexity where k is the number of ticks between the current tick and the target tick
## Comparison with Uniswap v3 Implementation
This implementation closely follows the approach used in Uniswap v3's core contracts, particularly:
1. The use of net liquidity changes at tick boundaries
2. The method of calculating liquidity by traversing ticks
3. The efficient handling of tick crossings
The main difference is that this implementation is in Java rather than Solidity, and uses `BigInteger` for precise arithmetic instead of Solidity's fixed-point arithmetic.

View File

@@ -0,0 +1,218 @@
package stream.uniswap3;
import java.io.Serial;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Map;
import java.util.TreeMap;
/**
* TickLiquidityTracker replicates Uniswap v3's tick-based liquidity tracking system.
*
* In Uniswap v3, liquidity is concentrated within specific price ranges defined by ticks.
* The protocol tracks:
* 1. The net liquidity change at each tick (liquidityNet)
* 2. The current global liquidity (liquidity)
*
* When the price crosses a tick, the global liquidity is updated by adding the net liquidity
* at that tick. This allows efficient calculation of liquidity at any price range.
*/
public class TickLiquidityTracker implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
// TreeMap to store the net liquidity change at each tick
// Key: tick index, Value: net liquidity change when crossing this tick
private final TreeMap<Integer, BigInteger> liquidityNetByTick;
// Current active liquidity
private BigInteger liquidity;
// Current tick
private int currentTick;
/**
* Creates a new TickLiquidityTracker with zero initial liquidity.
*/
public TickLiquidityTracker() {
this.liquidityNetByTick = new TreeMap<>();
this.liquidity = BigInteger.ZERO;
this.currentTick = 0;
}
/**
* Creates a new TickLiquidityTracker with specified initial liquidity and tick.
*
* @param initialLiquidity The initial global liquidity
* @param initialTick The initial current tick
*/
public TickLiquidityTracker(BigInteger initialLiquidity, int initialTick) {
this.liquidityNetByTick = new TreeMap<>();
this.liquidity = initialLiquidity;
this.currentTick = initialTick;
}
/**
* Updates the liquidity when a position is minted.
*
* @param tickLower The lower tick of the position
* @param tickUpper The upper tick of the position
* @param amount The amount of liquidity added
*/
public void updateLiquidityOnMint(int tickLower, int tickUpper, BigInteger amount) {
// Add liquidity at lower tick (when price crosses up)
updateLiquidityNetAtTick(tickLower, amount);
// Remove liquidity at upper tick (when price crosses up)
updateLiquidityNetAtTick(tickUpper, amount.negate());
// If current tick is within the range, update current liquidity
if (currentTick >= tickLower && currentTick < tickUpper) {
liquidity = liquidity.add(amount);
}
}
/**
* Updates the liquidity when a position is burned.
*
* @param tickLower The lower tick of the position
* @param tickUpper The upper tick of the position
* @param amount The amount of liquidity removed
*/
public void updateLiquidityOnBurn(int tickLower, int tickUpper, BigInteger amount) {
// Remove liquidity at lower tick (when price crosses up)
updateLiquidityNetAtTick(tickLower, amount.negate());
// Add liquidity at upper tick (when price crosses up)
updateLiquidityNetAtTick(tickUpper, amount);
// If current tick is within the range, update current liquidity
if (currentTick >= tickLower && currentTick < tickUpper) {
liquidity = liquidity.subtract(amount);
}
}
/**
* Updates the net liquidity at a specific tick.
*
* @param tick The tick index
* @param liquidityDelta The change in liquidity
*/
private void updateLiquidityNetAtTick(int tick, BigInteger liquidityDelta) {
BigInteger currentNet = liquidityNetByTick.getOrDefault(tick, BigInteger.ZERO);
liquidityNetByTick.put(tick, currentNet.add(liquidityDelta));
}
/**
* Moves the current tick and updates the global liquidity accordingly.
*
* @param newTick The new current tick
*/
public void crossTick(int newTick) {
// If moving to the same tick, do nothing
if (newTick == currentTick) {
return;
}
// Determine direction of tick movement
boolean isMovingUp = newTick > currentTick;
// Get all ticks that will be crossed
Map<Integer, BigInteger> crossedTicks;
if (isMovingUp) {
// When moving up, we include the current tick and exclude the new tick
crossedTicks = liquidityNetByTick.subMap(currentTick, false, newTick, false);
} else {
// When moving down, we exclude the current tick and include the new tick
crossedTicks = liquidityNetByTick.subMap(newTick, false, currentTick, false);
}
// Update liquidity by applying all crossed ticks
for (Map.Entry<Integer, BigInteger> entry : crossedTicks.entrySet()) {
if (isMovingUp) {
// When moving up, add the net liquidity
liquidity = liquidity.add(entry.getValue());
} else {
// When moving down, subtract the net liquidity
liquidity = liquidity.subtract(entry.getValue());
}
}
// Update current tick
currentTick = newTick;
}
/**
* Calculates the liquidity at a specific tick.
*
* @param tick The tick to calculate liquidity for
* @return The liquidity at the specified tick
*/
public BigInteger getLiquidityAtTick(int tick) {
// If the requested tick is the current tick, return current liquidity
if (tick == currentTick) {
return liquidity;
}
BigInteger result = liquidity;
// If the requested tick is above current tick
if (tick > currentTick) {
// Add all liquidityNet values between current tick (exclusive) and target tick (exclusive)
Map<Integer, BigInteger> ticksToAdd = liquidityNetByTick.subMap(currentTick, false, tick, false);
for (BigInteger delta : ticksToAdd.values()) {
result = result.add(delta);
}
}
// If the requested tick is below current tick
else {
// Subtract all liquidityNet values between target tick (exclusive) and current tick (exclusive)
Map<Integer, BigInteger> ticksToSubtract = liquidityNetByTick.subMap(tick, false, currentTick, false);
for (BigInteger delta : ticksToSubtract.values()) {
result = result.subtract(delta);
}
}
return result;
}
/**
* Gets the current global liquidity.
*
* @return The current global liquidity
*/
public BigInteger getCurrentLiquidity() {
return liquidity;
}
/**
* Gets the current tick.
*
* @return The current tick
*/
public int getCurrentTick() {
return currentTick;
}
/**
* Gets the net liquidity change at a specific tick.
*
* @param tick The tick index
* @return The net liquidity change at the specified tick
*/
public BigInteger getLiquidityNetAtTick(int tick) {
return liquidityNetByTick.getOrDefault(tick, BigInteger.ZERO);
}
/**
* Gets all ticks with non-zero liquidity net values.
*
* @return A map of tick indices to their net liquidity changes
*/
public Map<Integer, BigInteger> getAllLiquidityNetTicks() {
return new TreeMap<>(liquidityNetByTick);
}
}