initial checkin with basic arbitrum head blocks working

This commit is contained in:
tim
2025-05-17 20:28:20 -04:00
commit 0cd8e1e28f
24 changed files with 1300 additions and 0 deletions

View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.ParameterTool;
import stream.dto.ArbitrumOneBlock;
import stream.source.newheads.NewHeadsSourceFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
public class DataStreamJob {
private static final Logger log = LoggerFactory.getLogger(DataStreamJob.class);
public static void main(String[] args) throws Exception {
// from lowest priority to highest
// start by testing for a "job.properties" file
ParameterTool parameterTool;
try {
parameterTool = ParameterTool.fromPropertiesFile("job.properties");
log.info("loaded job.properties");
} catch (IOException e) {
log.info("job.properties file not found.");
parameterTool = ParameterTool.fromMap(Collections.emptyMap());
}
// and now in increasing priority:
ParameterTool parameters = parameterTool
.mergeWith(ParameterTool.fromSystemProperties()) // System properties override
.mergeWith(ParameterTool.fromMap(System.getenv())) // Environment variables override
.mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// do not do this until considering how secrets are handled by flink
// env.getConfig().setGlobalJobParameters(parameters);
URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545"));
URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546"));
// Create ObjectMapper for pretty JSON printing
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
DataStream<ArbitrumOneBlock> blockStream = env
.fromSource(
NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class),
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"Ethereum Blocks Source",
TypeInformation.of(ArbitrumOneBlock.class)
);
// Map the blocks to pretty-printed JSON strings
blockStream
.map(block -> {
try {
return mapper.writeValueAsString(block);
} catch (Exception e) {
return "Error converting block to JSON: " + e.getMessage();
}
})
.print("New Ethereum Block: ");
env.execute("Ethereum Block Stream");
}
}

View File

@@ -0,0 +1,13 @@
package stream.dto;
public class ArbitrumOneBlock extends EthereumBlock {
public String l1BlockNumber;
public String sendRoot;
public String sendCount;
public Long l1GasPrice;
public Long l1GasUsed;
public String l1Hash;
public Long l1Timestamp;
public String sequencerAddress;
}

View File

@@ -0,0 +1,5 @@
package stream.dto;
public class Block {
public Long chainId;
}

View File

@@ -0,0 +1,10 @@
package stream.dto;
public class BlockHash extends BlockId {
public String hash;
@Override
public Object getId() {
return this.hash;
}
}

View File

@@ -0,0 +1,6 @@
package stream.dto;
public abstract class BlockId extends Block {
abstract public Object getId();
}

View File

@@ -0,0 +1,10 @@
package stream.dto;
public class BlockNumber extends BlockId {
public long number;
@Override
public Object getId() {
return this.number;
}
}

View File

@@ -0,0 +1,35 @@
package stream.dto;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import stream.io.HexLongDeserializer;
import stream.io.HexLongSerializer;
import java.util.List;
public class EthereumBlock extends Block {
public String hash;
@JsonSerialize(using = HexLongSerializer.class)
@JsonDeserialize(using = HexLongDeserializer.class)
public Long number;
public String parentHash;
public String nonce;
public String sha3Uncles;
public String logsBloom;
public String transactionsRoot;
public String stateRoot;
public String receiptsRoot;
public String miner;
public String difficulty;
public String totalDifficulty;
public String extraData;
public Long size;
public String gasLimit;
public String gasUsed;
@JsonSerialize(using = HexLongSerializer.class)
@JsonDeserialize(using = HexLongDeserializer.class)
public Long timestamp;
public List<String> transactions;
public List<String> uncles;
}

View File

@@ -0,0 +1,14 @@
package stream.dto;
import java.util.List;
public class EventLog {
public boolean removed; // true if log was removed due to chain reorg
public Long logIndex;
public Long transactionIndex;
public String transactionHash;
public String blockHash;
public Long blockNumber;
public String address; // contract that emitted the event
public String data; // contains the non-indexed parameters
public List<String> topics; // contains the event signature and indexed parameters
}

View File

@@ -0,0 +1,35 @@
package stream.dto;
import java.util.List;
public class Transaction {
public String hash;
public String nonce;
public String blockHash;
public Long blockNumber;
public Long transactionIndex;
public String from;
public String to;
public String value;
public Long gas;
public String gasPrice;
public String input;
public String r;
public String s;
public String v;
public String creates; // contract creation address if this tx created a contract
public String chainId;
public String type; // transaction type (0 = legacy, 1 = EIP-2930, 2 = EIP-1559)
// EIP-1559 specific fields
public String maxFeePerGas;
public String maxPriorityFeePerGas;
// Receipt fields often included with transaction
public Long cumulativeGasUsed;
public Long gasUsed;
public String contractAddress;
public boolean status; // true if successful, false if failed
public List<EventLog> logs;
public String logsBloom;
}

View File

@@ -0,0 +1,108 @@
package stream.io;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
public class EthereumWebSocketClient extends WebSocketJsonRpcClient {
private final Map<String, Subscription> activeSubscriptions = new ConcurrentHashMap<>();
private final Logger logger = LoggerFactory.getLogger(EthereumWebSocketClient.class);
private final ObjectMapper mapper = new ObjectMapper();
public EthereumWebSocketClient(URI serverUri) {
super(serverUri);
}
public static class Subscription {
private final String method;
private final Object params;
private final Consumer<JsonNode> callback;
private String subscriptionId;
public Subscription(String method, Object params, Consumer<JsonNode> callback) {
this.method = method;
this.params = params;
this.callback = callback;
}
}
public void subscribeNewHeads(Consumer<JsonNode> callback) {
subscribe(new Object[]{"newHeads"}, callback);
}
public void subscribeLogFilter(String address, String[] topics, Consumer<JsonNode> callback) {
ObjectNode filterParams = mapper.createObjectNode();
filterParams.put("address", address);
filterParams.set("topics", mapper.valueToTree(topics));
subscribe(new Object[]{"logs", filterParams}, callback);
}
private void subscribe(Object params, Consumer<JsonNode> callback) {
Subscription subscription = new Subscription("eth_subscribe", params, callback);
String requestId = sendRpc("eth_subscribe", params);
// Temporarily store the subscription with requestId as key
// Will be updated with actual subscriptionId when we receive the response
activeSubscriptions.put(requestId, subscription);
}
@Override
public void onMessage(String message) {
super.onMessage(message);
try {
JsonNode node = mapper.readTree(message);
// Handle subscription response
if (node.has("id")) {
String id = node.get("id").asText();
if (activeSubscriptions.containsKey(id)) {
String subscriptionId = node.get("result").asText();
Subscription subscription = activeSubscriptions.remove(id);
subscription.subscriptionId = subscriptionId;
activeSubscriptions.put(subscriptionId, subscription);
}
}
// Handle subscription notification
if (node.has("method") && "eth_subscription".equals(node.get("method").asText())) {
JsonNode params = node.get("params");
String subscriptionId = params.get("subscription").asText();
Subscription subscription = activeSubscriptions.get(subscriptionId);
if (subscription != null) {
subscription.callback.accept(params.get("result"));
}
}
} catch (IOException e) {
logger.error("Error processing message", e);
}
}
@Override
public void onOpen(ServerHandshake handshake) {
super.onOpen(handshake);
restoreSubscriptions();
}
private void restoreSubscriptions() {
// Create new subscriptions for all active ones
for (Subscription subscription : activeSubscriptions.values()) {
sendRpc(subscription.method, subscription.params);
}
}
public void unsubscribe(String subscriptionId) {
if (activeSubscriptions.containsKey(subscriptionId)) {
sendRpc("eth_unsubscribe", new Object[]{subscriptionId});
activeSubscriptions.remove(subscriptionId);
}
}
}

View File

@@ -0,0 +1,47 @@
package stream.io;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import stream.dto.Block;
import stream.dto.BlockHash;
import stream.dto.BlockId;
import stream.dto.BlockNumber;
import java.util.Collections;
public class FetchBlock<BLOCKID extends BlockId, BLOCK extends Block> extends RichAsyncFunction<BLOCKID, BLOCK> {
private final JsonRpcClient jsonRpcClient;
private final Class<BLOCK> blockClass;
public FetchBlock(JsonRpcClient jsonRpcClient, Class<BLOCK> blockClass) {
this.jsonRpcClient = jsonRpcClient;
this.blockClass = blockClass;
}
@Override
public void asyncInvoke(BLOCKID blockId, ResultFuture<BLOCK> resultFuture) throws Exception {
String method;
Object param;
if (blockId instanceof BlockNumber) {
method = "eth_getBlockByNumber";
param = "0x" + Long.toHexString(((BlockNumber) blockId).number);
} else if (blockId instanceof BlockHash) {
method = "eth_getBlockByHash";
param = ((BlockHash) blockId).hash;
} else {
resultFuture.complete(Collections.emptyList());
return;
}
try {
var result = jsonRpcClient.invoke(method, param, blockClass);
resultFuture.complete(Collections.<BLOCK>singleton(result));
}
catch (Throwable e) {
resultFuture.completeExceptionally(e);
}
}
}

View File

@@ -0,0 +1,24 @@
package stream.io;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import java.io.IOException;
public class HexLongDeserializer extends JsonDeserializer<Long> {
@Override
public Long deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
String value = p.getValueAsString();
if (value == null || value.isEmpty()) {
return null;
}
// Remove "0x" prefix if present
if (value.startsWith("0x")) {
value = value.substring(2);
}
return Long.parseLong(value, 16);
}
}

View File

@@ -0,0 +1,18 @@
package stream.io;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
public class HexLongSerializer extends JsonSerializer<Long> {
@Override
public void serialize(Long value, JsonGenerator gen, SerializerProvider provider) throws IOException {
if (value == null) {
gen.writeNull();
} else {
gen.writeString("0x" + Long.toHexString(value));
}
}
}

View File

@@ -0,0 +1,63 @@
package stream.io;
import com.googlecode.jsonrpc4j.JsonRpcHttpClient;
import java.net.URL;
import java.util.concurrent.Semaphore;
public class JsonRpcClient implements AutoCloseable {
private final JsonRpcHttpClient jsonRpcClient;
private final Semaphore requestSemaphore;
// Default max concurrent requests
private static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 10;
public JsonRpcClient(String endpoint) throws Exception {
this(endpoint, DEFAULT_MAX_CONCURRENT_REQUESTS);
}
public JsonRpcClient(String endpoint, int maxConcurrentRequests) throws Exception {
this.jsonRpcClient = new JsonRpcHttpClient(new URL(endpoint));
this.requestSemaphore = new Semaphore(maxConcurrentRequests);
}
public <T> T invoke(String methodName, Object argument, Class<T> returnType) throws Throwable {
try {
requestSemaphore.acquire();
try {
return jsonRpcClient.invoke(methodName, argument, returnType);
} finally {
requestSemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Request interrupted while waiting for semaphore", e);
}
}
public void invoke(String methodName, Object argument) throws Throwable {
try {
requestSemaphore.acquire();
try {
jsonRpcClient.invoke(methodName, argument);
} finally {
requestSemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Request interrupted while waiting for semaphore", e);
}
}
@Override
public void close() throws Exception {
}
/**
* Returns the current number of permits available in the semaphore.
* @return the number of available permits
*/
public int getAvailablePermits() {
return requestSemaphore.availablePermits();
}
}

View File

@@ -0,0 +1,52 @@
package stream.io;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.googlecode.jsonrpc4j.JsonRpcClient;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
public class WebSocketJsonRpcClient extends WebSocketClient {
private static final Logger logger = LoggerFactory.getLogger(WebSocketJsonRpcClient.class);
private final ObjectMapper mapper = new ObjectMapper();
private final AtomicLong nextId = new AtomicLong(1);
private final JsonRpcClient client = new JsonRpcClient(mapper);
public WebSocketJsonRpcClient(URI serverUri) {
super(serverUri);
}
public String sendRpc(String method, Object params) {
if (!this.isOpen())
this.connect();
String requestId = "rpc-"+nextId.getAndIncrement();
var message = client.createRequest(method, params, requestId).toString();
this.send(message);
return requestId;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
logger.info("Connection opened: {}", serverHandshake);
}
@Override
public void onMessage(String message) {
logger.info("Received message: {}", message);
}
@Override
public void onClose(int i, String s, boolean b) {
logger.info("Connection closed: {} {} {}", i, s, b);
}
@Override
public void onError(Exception e) {
logger.error("Connection error", e);
}
}

View File

@@ -0,0 +1,195 @@
package stream.source.newheads;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.Block;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
/**
* A Flink Source that emits blocks from the Ethereum blockchain by subscribing to newheads events.
*
* @param <T> The specific Block subclass to emit
*/
public class NewHeadsSource<T extends Block> implements Source<T, NewHeadsSource.NewHeadsSplit, NewHeadsSource.NewHeadsEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(NewHeadsSource.class);
private final URI websocketUri;
private final Class<T> blockClass;
/**
* Creates a new NewHeadsSource.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param blockClass The class of the Block subclass to emit
*/
public NewHeadsSource(URI websocketUri, Class<T> blockClass) {
this.websocketUri = websocketUri;
this.blockClass = blockClass;
}
@Override
public Boundedness getBoundedness() {
// This source is unbounded as it continuously receives new blocks
return Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SourceReader<T, NewHeadsSplit> createReader(SourceReaderContext readerContext) {
return new NewHeadsSourceReader<>(websocketUri, blockClass, readerContext);
}
@Override
public SplitEnumerator<NewHeadsSplit, NewHeadsEnumeratorState> createEnumerator(SplitEnumeratorContext<NewHeadsSplit> enumContext) {
return new NewHeadsSplitEnumerator(enumContext);
}
@Override
public SplitEnumerator<NewHeadsSplit, NewHeadsEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<NewHeadsSplit> enumContext, NewHeadsEnumeratorState state) {
return new NewHeadsSplitEnumerator(enumContext);
}
@Override
public SimpleVersionedSerializer<NewHeadsSplit> getSplitSerializer() {
return new NewHeadsSplitSerializer();
}
// This method is not needed in Flink 2.0.0
@Override
public SimpleVersionedSerializer<NewHeadsEnumeratorState> getEnumeratorCheckpointSerializer() {
return new NewHeadsEnumeratorStateSerializer();
}
/**
* A simple split for the NewHeadsSource. Since we're just subscribing to a stream of events,
* we don't need to partition the source, so we use a single dummy split.
*/
public static class NewHeadsSplit implements SourceSplit {
private static final String SPLIT_ID = "newheads-split";
@Override
public String splitId() {
return SPLIT_ID;
}
}
/**
* A simple state for the NewHeadsSource enumerator. Since we're just subscribing to a stream of events,
* we don't need to track any state, so this is just a placeholder.
*/
public static class NewHeadsEnumeratorState {
// No state needed
}
/**
* A simple serializer for NewHeadsSplit.
*/
private static class NewHeadsSplitSerializer implements SimpleVersionedSerializer<NewHeadsSplit> {
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(NewHeadsSplit split) {
// Since our split is just a dummy, we can return an empty byte array
return new byte[0];
}
@Override
public NewHeadsSplit deserialize(int version, byte[] serialized) {
// Since our split is just a dummy, we can create a new instance
return new NewHeadsSplit();
}
}
/**
* A simple serializer for NewHeadsEnumeratorState.
*/
private static class NewHeadsEnumeratorStateSerializer implements SimpleVersionedSerializer<NewHeadsEnumeratorState> {
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(NewHeadsEnumeratorState state) {
// Since our state is just a dummy, we can return an empty byte array
return new byte[0];
}
@Override
public NewHeadsEnumeratorState deserialize(int version, byte[] serialized) {
// Since our state is just a dummy, we can create a new instance
return new NewHeadsEnumeratorState();
}
}
/**
* A simple enumerator for the NewHeadsSource. Since we're just subscribing to a stream of events,
* we don't need to partition the source, so we just assign a single dummy split to the first reader.
*/
private static class NewHeadsSplitEnumerator implements SplitEnumerator<NewHeadsSplit, NewHeadsEnumeratorState> {
private final SplitEnumeratorContext<NewHeadsSplit> context;
private boolean splitAssigned = false;
public NewHeadsSplitEnumerator(SplitEnumeratorContext<NewHeadsSplit> context) {
this.context = context;
}
@Override
public void start() {
// If we have any readers, assign the split
if (context.registeredReaders().size() > 0) {
assignSplit();
}
}
@Override
public void handleSplitRequest(int subtaskId, String requesterHostname) {
// Ignore split requests, we assign splits proactively
}
@Override
public void addSplitsBack(List<NewHeadsSplit> splits, int subtaskId) {
// If a reader failed and we get splits back, we'll reassign them when a new reader registers
splitAssigned = false;
}
@Override
public void addReader(int subtaskId) {
// When a new reader registers, assign the split if it hasn't been assigned yet
if (!splitAssigned) {
assignSplit();
}
}
private void assignSplit() {
if (!context.registeredReaders().isEmpty()) {
// Assign the split to the first reader
int firstReader = context.registeredReaders().keySet().iterator().next();
context.assignSplit(new NewHeadsSplit(), firstReader);
splitAssigned = true;
LOG.info("Assigned NewHeadsSplit to reader {}", firstReader);
}
}
@Override
public NewHeadsEnumeratorState snapshotState(long checkpointId) throws Exception {
return new NewHeadsEnumeratorState();
}
@Override
public void close() throws IOException {
// No resources to clean up
}
}
}

View File

@@ -0,0 +1,38 @@
package stream.source.newheads;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.connector.source.Source;
import stream.dto.Block;
import java.net.URI;
/**
* A factory class for creating NewHeadsSource and NewHeadsSubscriber instances.
*/
public class NewHeadsSourceFactory {
/**
* Creates a new NewHeadsSubscriber for the given websocket URI and block class.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param blockClass The class of the Block subclass to emit
* @param <T> The specific Block subclass to emit
* @return A new NewHeadsSubscriber
*/
public static <T extends Block> NewHeadsSubscriber<T> createSubscriber(URI websocketUri, Class<T> blockClass) {
return new NewHeadsSubscriber<>(websocketUri, blockClass, new ObjectMapper());
}
/**
* Creates a new NewHeadsSource for the given websocket URI and block class.
* This source will emit blocks from the Ethereum blockchain by subscribing to newheads events.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param blockClass The class of the Block subclass to emit
* @param <T> The specific Block subclass to emit
* @return A new NewHeadsSource
*/
public static <T extends Block> Source<T, ?, ?> createSource(URI websocketUri, Class<T> blockClass) {
return new NewHeadsSource<>(websocketUri, blockClass);
}
}

View File

@@ -0,0 +1,125 @@
package stream.source.newheads;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.Block;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* A Flink SourceReader that reads blocks from the Ethereum blockchain by subscribing to newheads events.
*
* @param <T> The specific Block subclass to emit
*/
public class NewHeadsSourceReader<T extends Block> implements SourceReader<T, NewHeadsSource.NewHeadsSplit> {
private static final Logger LOG = LoggerFactory.getLogger(NewHeadsSourceReader.class);
private final URI websocketUri;
private final Class<T> blockClass;
private final List<NewHeadsSource.NewHeadsSplit> assignedSplits;
private NewHeadsSubscriber<T> subscriber;
/**
* Creates a new NewHeadsSourceReader.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param blockClass The class of the Block subclass to emit
* @param context The source reader context
*/
public NewHeadsSourceReader(URI websocketUri, Class<T> blockClass, @SuppressWarnings("unused") SourceReaderContext context) {
this.websocketUri = websocketUri;
this.blockClass = blockClass;
this.assignedSplits = new ArrayList<>();
}
@Override
public void start() {
// We don't start the subscriber here, we wait until we get a split
}
@SuppressWarnings("RedundantThrows")
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
// If we haven't been assigned a split yet, we can't do anything
if (assignedSplits.isEmpty()) {
return InputStatus.NOTHING_AVAILABLE;
}
// If we haven't created the subscriber yet, create it now
if (subscriber == null) {
try {
subscriber = new NewHeadsSubscriber<>(websocketUri, blockClass);
subscriber.connect();
LOG.info("Connected to Ethereum node at {}", websocketUri);
} catch (Exception e) {
LOG.error("Error connecting to Ethereum node", e);
return InputStatus.NOTHING_AVAILABLE;
}
}
// Try to get the next block
T block = subscriber.getNextBlockNonBlocking();
if (block != null) {
// We got a block, emit it
output.collect(block);
LOG.debug("Emitted block: {}", block);
return InputStatus.MORE_AVAILABLE;
} else {
// No block available right now
return InputStatus.NOTHING_AVAILABLE;
}
}
@Override
public CompletableFuture<Void> isAvailable() {
// If we have a subscriber and it's running, we might get more data
if (subscriber != null && subscriber.isRunning()) {
return CompletableFuture.completedFuture(null);
} else {
return CompletableFuture.supplyAsync(() -> {
try {
// Wait a bit before checking again
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
});
}
}
@Override
public List<NewHeadsSource.NewHeadsSplit> snapshotState(long checkpointId) {
// We don't need to snapshot any state, just return the assigned splits
return Collections.unmodifiableList(assignedSplits);
}
@Override
public void addSplits(List<NewHeadsSource.NewHeadsSplit> splits) {
LOG.info("Adding {} splits", splits.size());
assignedSplits.addAll(splits);
}
@Override
public void notifyNoMoreSplits() {
LOG.info("No more splits will be assigned");
}
@Override
public void close() throws Exception {
if (subscriber != null) {
subscriber.close();
}
}
}

View File

@@ -0,0 +1,127 @@
package stream.source.newheads;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.dto.Block;
import stream.io.EthereumWebSocketClient;
import com.fasterxml.jackson.databind.DeserializationFeature;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A utility class that subscribes to Ethereum newheads events and provides a method to get the latest block.
* This class is not a Flink source itself, but can be used by a custom Flink source implementation.
*
* @param <T> The specific Block subclass to emit
*/
public class NewHeadsSubscriber<T extends Block> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NewHeadsSubscriber.class);
private final URI websocketUri;
private final Class<T> blockClass;
private final BlockingQueue<T> blockQueue;
private final AtomicBoolean running;
private final ObjectMapper mapper;
private EthereumWebSocketClient client;
/**
* Creates a new NewHeadsSubscriber.
*
* @param websocketUri The URI of the Ethereum websocket endpoint
* @param blockClass The class of the Block subclass to emit
*/
public NewHeadsSubscriber(URI websocketUri, Class<T> blockClass) {
this(websocketUri, blockClass,
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false));
}
public NewHeadsSubscriber(URI websocketUri, Class<T> blockClass, ObjectMapper mapper) {
this.websocketUri = websocketUri;
this.blockClass = blockClass;
this.blockQueue = new LinkedBlockingQueue<>();
this.running = new AtomicBoolean(true);
this.mapper = mapper;
}
/**
* Connects to the Ethereum node and subscribes to newheads events.
*
* @throws Exception If an error occurs while connecting or subscribing
*/
public void connect() throws Exception {
// Create and connect the client
client = new EthereumWebSocketClient(websocketUri);
client.connectBlocking();
// Subscribe to newheads
client.subscribeNewHeads(this::processNewHead);
LOG.info("Subscribed to newheads events from {}", websocketUri);
}
/**
* Processes a new block header received from the Ethereum node.
*
* @param jsonNode The JSON representation of the block header
*/
private void processNewHead(JsonNode jsonNode) {
try {
// Convert the JSON to the specified Block subclass
T block = mapper.treeToValue(jsonNode, blockClass);
// Add the block to the queue
blockQueue.add(block);
LOG.debug("Received new block: {}", block);
} catch (Exception e) {
LOG.error("Error processing new block", e);
}
}
/**
* Gets the next block from the queue, blocking if necessary until a block is available.
*
* @return The next block
* @throws InterruptedException If the thread is interrupted while waiting
*/
public T getNextBlock() throws InterruptedException {
return blockQueue.take();
}
/**
* Gets the next block from the queue, returning null if no block is available.
*
* @return The next block, or null if no block is available
*/
public T getNextBlockNonBlocking() {
return blockQueue.poll();
}
/**
* Checks if the subscriber is running.
*
* @return True if the subscriber is running, false otherwise
*/
public boolean isRunning() {
return running.get();
}
@Override
public void close() throws Exception {
// Set running to false to signal that we're done
running.set(false);
// Close the client if it exists
if (client != null) {
client.close();
}
}
}

View File

@@ -0,0 +1,7 @@
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n