commit 0cd8e1e28f424e8646b5f28639c5e2c83b50464e Author: tim Date: Sat May 17 20:28:20 2025 -0400 initial checkin with basic arbitrum head blocks working diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..09168ba --- /dev/null +++ b/.gitignore @@ -0,0 +1,40 @@ +job.properties +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store + diff --git a/README.md b/README.md new file mode 100644 index 0000000..cb5395a --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +This project uses Apache Flink 2.x to injest, process, and act on data from blockchains and from centralized exchanges. diff --git a/job-example.properties b/job-example.properties new file mode 100644 index 0000000..9eae8ef --- /dev/null +++ b/job-example.properties @@ -0,0 +1,4 @@ +# Example job.properties file + +rpc_url=http://localhost:8545 +ws_url=ws://localhost:8546 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..96937f2 --- /dev/null +++ b/pom.xml @@ -0,0 +1,235 @@ + + + 4.0.0 + + lbarb + stream + 1.0-SNAPSHOT + jar + + Flink Quickstart Job + + + UTF-8 + ${target.java.version} + ${target.java.version} + 11 + 2.12 + 2.0.0 + 2.24.1 + 5.0.0 + 5.3.1 + 1.6 + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + + org.apache.flink + flink-clients + ${flink.version} + + + + + + + + + + org.web3j + core + ${web3j.version} + + + + com.github.briandilley.jsonrpc4j + jsonrpc4j + ${jsonrpc4j.version} + + + + org.apache.httpcomponents.client5 + httpclient5 + ${httpclient5.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + false + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + stream.DataStreamJob + + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.1.1,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java new file mode 100644 index 0000000..7b6cc84 --- /dev/null +++ b/src/main/java/stream/DataStreamJob.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stream; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.ParameterTool; +import stream.dto.ArbitrumOneBlock; +import stream.source.newheads.NewHeadsSourceFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +public class DataStreamJob { + private static final Logger log = LoggerFactory.getLogger(DataStreamJob.class); + + public static void main(String[] args) throws Exception { + // from lowest priority to highest + // start by testing for a "job.properties" file + ParameterTool parameterTool; + try { + parameterTool = ParameterTool.fromPropertiesFile("job.properties"); + log.info("loaded job.properties"); + } catch (IOException e) { + log.info("job.properties file not found."); + parameterTool = ParameterTool.fromMap(Collections.emptyMap()); + } + // and now in increasing priority: + ParameterTool parameters = parameterTool + .mergeWith(ParameterTool.fromSystemProperties()) // System properties override + .mergeWith(ParameterTool.fromMap(System.getenv())) // Environment variables override + .mergeWith(ParameterTool.fromArgs(args)); // Command line has highest priority + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // do not do this until considering how secrets are handled by flink +// env.getConfig().setGlobalJobParameters(parameters); + URI httpUri = new URI(parameters.get("rpc_url", "http://localhost:8545")); + URI webSocketUri = new URI(parameters.get("ws_url", "ws://localhost:8546")); + + // Create ObjectMapper for pretty JSON printing + ObjectMapper mapper = new ObjectMapper(); + mapper.enable(SerializationFeature.INDENT_OUTPUT); + + DataStream blockStream = env + .fromSource( + NewHeadsSourceFactory.createSource(webSocketUri, ArbitrumOneBlock.class), + org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(), + "Ethereum Blocks Source", + TypeInformation.of(ArbitrumOneBlock.class) + ); + + // Map the blocks to pretty-printed JSON strings + blockStream + .map(block -> { + try { + return mapper.writeValueAsString(block); + } catch (Exception e) { + return "Error converting block to JSON: " + e.getMessage(); + } + }) + .print("New Ethereum Block: "); + + env.execute("Ethereum Block Stream"); + } +} \ No newline at end of file diff --git a/src/main/java/stream/dto/ArbitrumOneBlock.java b/src/main/java/stream/dto/ArbitrumOneBlock.java new file mode 100644 index 0000000..48f6c4c --- /dev/null +++ b/src/main/java/stream/dto/ArbitrumOneBlock.java @@ -0,0 +1,13 @@ +package stream.dto; + +public class ArbitrumOneBlock extends EthereumBlock { + public String l1BlockNumber; + public String sendRoot; + public String sendCount; + public Long l1GasPrice; + public Long l1GasUsed; + public String l1Hash; + public Long l1Timestamp; + public String sequencerAddress; + +} diff --git a/src/main/java/stream/dto/Block.java b/src/main/java/stream/dto/Block.java new file mode 100644 index 0000000..dddff13 --- /dev/null +++ b/src/main/java/stream/dto/Block.java @@ -0,0 +1,5 @@ +package stream.dto; + +public class Block { + public Long chainId; +} diff --git a/src/main/java/stream/dto/BlockHash.java b/src/main/java/stream/dto/BlockHash.java new file mode 100644 index 0000000..04e0414 --- /dev/null +++ b/src/main/java/stream/dto/BlockHash.java @@ -0,0 +1,10 @@ +package stream.dto; + +public class BlockHash extends BlockId { + public String hash; + + @Override + public Object getId() { + return this.hash; + } +} diff --git a/src/main/java/stream/dto/BlockId.java b/src/main/java/stream/dto/BlockId.java new file mode 100644 index 0000000..e3a3506 --- /dev/null +++ b/src/main/java/stream/dto/BlockId.java @@ -0,0 +1,6 @@ +package stream.dto; + +public abstract class BlockId extends Block { + abstract public Object getId(); +} + diff --git a/src/main/java/stream/dto/BlockNumber.java b/src/main/java/stream/dto/BlockNumber.java new file mode 100644 index 0000000..c154d38 --- /dev/null +++ b/src/main/java/stream/dto/BlockNumber.java @@ -0,0 +1,10 @@ +package stream.dto; + +public class BlockNumber extends BlockId { + public long number; + + @Override + public Object getId() { + return this.number; + } +} diff --git a/src/main/java/stream/dto/EthereumBlock.java b/src/main/java/stream/dto/EthereumBlock.java new file mode 100644 index 0000000..bb8b10a --- /dev/null +++ b/src/main/java/stream/dto/EthereumBlock.java @@ -0,0 +1,35 @@ +package stream.dto; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import stream.io.HexLongDeserializer; +import stream.io.HexLongSerializer; + +import java.util.List; + +public class EthereumBlock extends Block { + public String hash; + + @JsonSerialize(using = HexLongSerializer.class) + @JsonDeserialize(using = HexLongDeserializer.class) + public Long number; + public String parentHash; + public String nonce; + public String sha3Uncles; + public String logsBloom; + public String transactionsRoot; + public String stateRoot; + public String receiptsRoot; + public String miner; + public String difficulty; + public String totalDifficulty; + public String extraData; + public Long size; + public String gasLimit; + public String gasUsed; + @JsonSerialize(using = HexLongSerializer.class) + @JsonDeserialize(using = HexLongDeserializer.class) + public Long timestamp; + public List transactions; + public List uncles; +} \ No newline at end of file diff --git a/src/main/java/stream/dto/EventLog.java b/src/main/java/stream/dto/EventLog.java new file mode 100644 index 0000000..44b67d7 --- /dev/null +++ b/src/main/java/stream/dto/EventLog.java @@ -0,0 +1,14 @@ +package stream.dto; +import java.util.List; + +public class EventLog { + public boolean removed; // true if log was removed due to chain reorg + public Long logIndex; + public Long transactionIndex; + public String transactionHash; + public String blockHash; + public Long blockNumber; + public String address; // contract that emitted the event + public String data; // contains the non-indexed parameters + public List topics; // contains the event signature and indexed parameters +} diff --git a/src/main/java/stream/dto/Transaction.java b/src/main/java/stream/dto/Transaction.java new file mode 100644 index 0000000..7fe2f3f --- /dev/null +++ b/src/main/java/stream/dto/Transaction.java @@ -0,0 +1,35 @@ +package stream.dto; + +import java.util.List; + +public class Transaction { + public String hash; + public String nonce; + public String blockHash; + public Long blockNumber; + public Long transactionIndex; + public String from; + public String to; + public String value; + public Long gas; + public String gasPrice; + public String input; + public String r; + public String s; + public String v; + public String creates; // contract creation address if this tx created a contract + public String chainId; + public String type; // transaction type (0 = legacy, 1 = EIP-2930, 2 = EIP-1559) + + // EIP-1559 specific fields + public String maxFeePerGas; + public String maxPriorityFeePerGas; + + // Receipt fields often included with transaction + public Long cumulativeGasUsed; + public Long gasUsed; + public String contractAddress; + public boolean status; // true if successful, false if failed + public List logs; + public String logsBloom; +} diff --git a/src/main/java/stream/io/EthereumWebSocketClient.java b/src/main/java/stream/io/EthereumWebSocketClient.java new file mode 100644 index 0000000..0be91ef --- /dev/null +++ b/src/main/java/stream/io/EthereumWebSocketClient.java @@ -0,0 +1,108 @@ +package stream.io; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class EthereumWebSocketClient extends WebSocketJsonRpcClient { + private final Map activeSubscriptions = new ConcurrentHashMap<>(); + private final Logger logger = LoggerFactory.getLogger(EthereumWebSocketClient.class); + private final ObjectMapper mapper = new ObjectMapper(); + + public EthereumWebSocketClient(URI serverUri) { + super(serverUri); + } + + public static class Subscription { + private final String method; + private final Object params; + private final Consumer callback; + private String subscriptionId; + + public Subscription(String method, Object params, Consumer callback) { + this.method = method; + this.params = params; + this.callback = callback; + } + } + + public void subscribeNewHeads(Consumer callback) { + subscribe(new Object[]{"newHeads"}, callback); + } + + public void subscribeLogFilter(String address, String[] topics, Consumer callback) { + ObjectNode filterParams = mapper.createObjectNode(); + filterParams.put("address", address); + filterParams.set("topics", mapper.valueToTree(topics)); + subscribe(new Object[]{"logs", filterParams}, callback); + } + + private void subscribe(Object params, Consumer callback) { + Subscription subscription = new Subscription("eth_subscribe", params, callback); + String requestId = sendRpc("eth_subscribe", params); + // Temporarily store the subscription with requestId as key + // Will be updated with actual subscriptionId when we receive the response + activeSubscriptions.put(requestId, subscription); + } + + @Override + public void onMessage(String message) { + super.onMessage(message); + try { + JsonNode node = mapper.readTree(message); + + // Handle subscription response + if (node.has("id")) { + String id = node.get("id").asText(); + if (activeSubscriptions.containsKey(id)) { + String subscriptionId = node.get("result").asText(); + Subscription subscription = activeSubscriptions.remove(id); + subscription.subscriptionId = subscriptionId; + activeSubscriptions.put(subscriptionId, subscription); + } + } + + // Handle subscription notification + if (node.has("method") && "eth_subscription".equals(node.get("method").asText())) { + JsonNode params = node.get("params"); + String subscriptionId = params.get("subscription").asText(); + Subscription subscription = activeSubscriptions.get(subscriptionId); + if (subscription != null) { + subscription.callback.accept(params.get("result")); + } + } + } catch (IOException e) { + logger.error("Error processing message", e); + } + + } + + @Override + public void onOpen(ServerHandshake handshake) { + super.onOpen(handshake); + restoreSubscriptions(); + } + + private void restoreSubscriptions() { + // Create new subscriptions for all active ones + for (Subscription subscription : activeSubscriptions.values()) { + sendRpc(subscription.method, subscription.params); + } + } + + public void unsubscribe(String subscriptionId) { + if (activeSubscriptions.containsKey(subscriptionId)) { + sendRpc("eth_unsubscribe", new Object[]{subscriptionId}); + activeSubscriptions.remove(subscriptionId); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/FetchBlock.java b/src/main/java/stream/io/FetchBlock.java new file mode 100644 index 0000000..e773d46 --- /dev/null +++ b/src/main/java/stream/io/FetchBlock.java @@ -0,0 +1,47 @@ +package stream.io; + +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import stream.dto.Block; +import stream.dto.BlockHash; +import stream.dto.BlockId; +import stream.dto.BlockNumber; + +import java.util.Collections; + +public class FetchBlock extends RichAsyncFunction { + + private final JsonRpcClient jsonRpcClient; + private final Class blockClass; + + public FetchBlock(JsonRpcClient jsonRpcClient, Class blockClass) { + this.jsonRpcClient = jsonRpcClient; + this.blockClass = blockClass; + } + + @Override + public void asyncInvoke(BLOCKID blockId, ResultFuture resultFuture) throws Exception { + String method; + Object param; + + if (blockId instanceof BlockNumber) { + method = "eth_getBlockByNumber"; + param = "0x" + Long.toHexString(((BlockNumber) blockId).number); + } else if (blockId instanceof BlockHash) { + method = "eth_getBlockByHash"; + param = ((BlockHash) blockId).hash; + } else { + resultFuture.complete(Collections.emptyList()); + return; + } + + try { + var result = jsonRpcClient.invoke(method, param, blockClass); + resultFuture.complete(Collections.singleton(result)); + } + catch (Throwable e) { + resultFuture.completeExceptionally(e); + } + } + +} diff --git a/src/main/java/stream/io/HexLongDeserializer.java b/src/main/java/stream/io/HexLongDeserializer.java new file mode 100644 index 0000000..73616ef --- /dev/null +++ b/src/main/java/stream/io/HexLongDeserializer.java @@ -0,0 +1,24 @@ +package stream.io; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +import java.io.IOException; + +public class HexLongDeserializer extends JsonDeserializer { + @Override + public Long deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + String value = p.getValueAsString(); + if (value == null || value.isEmpty()) { + return null; + } + + // Remove "0x" prefix if present + if (value.startsWith("0x")) { + value = value.substring(2); + } + + return Long.parseLong(value, 16); + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/HexLongSerializer.java b/src/main/java/stream/io/HexLongSerializer.java new file mode 100644 index 0000000..44f72a2 --- /dev/null +++ b/src/main/java/stream/io/HexLongSerializer.java @@ -0,0 +1,18 @@ +package stream.io; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; + +public class HexLongSerializer extends JsonSerializer { + @Override + public void serialize(Long value, JsonGenerator gen, SerializerProvider provider) throws IOException { + if (value == null) { + gen.writeNull(); + } else { + gen.writeString("0x" + Long.toHexString(value)); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/JsonRpcClient.java b/src/main/java/stream/io/JsonRpcClient.java new file mode 100644 index 0000000..b667f33 --- /dev/null +++ b/src/main/java/stream/io/JsonRpcClient.java @@ -0,0 +1,63 @@ +package stream.io; + +import com.googlecode.jsonrpc4j.JsonRpcHttpClient; + +import java.net.URL; +import java.util.concurrent.Semaphore; + +public class JsonRpcClient implements AutoCloseable { + private final JsonRpcHttpClient jsonRpcClient; + private final Semaphore requestSemaphore; + + // Default max concurrent requests + private static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 10; + + public JsonRpcClient(String endpoint) throws Exception { + this(endpoint, DEFAULT_MAX_CONCURRENT_REQUESTS); + } + + public JsonRpcClient(String endpoint, int maxConcurrentRequests) throws Exception { + this.jsonRpcClient = new JsonRpcHttpClient(new URL(endpoint)); + this.requestSemaphore = new Semaphore(maxConcurrentRequests); + } + + public T invoke(String methodName, Object argument, Class returnType) throws Throwable { + try { + requestSemaphore.acquire(); + try { + return jsonRpcClient.invoke(methodName, argument, returnType); + } finally { + requestSemaphore.release(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Request interrupted while waiting for semaphore", e); + } + } + + public void invoke(String methodName, Object argument) throws Throwable { + try { + requestSemaphore.acquire(); + try { + jsonRpcClient.invoke(methodName, argument); + } finally { + requestSemaphore.release(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Request interrupted while waiting for semaphore", e); + } + } + + @Override + public void close() throws Exception { + } + + /** + * Returns the current number of permits available in the semaphore. + * @return the number of available permits + */ + public int getAvailablePermits() { + return requestSemaphore.availablePermits(); + } +} \ No newline at end of file diff --git a/src/main/java/stream/io/WebSocketJsonRpcClient.java b/src/main/java/stream/io/WebSocketJsonRpcClient.java new file mode 100644 index 0000000..2f68637 --- /dev/null +++ b/src/main/java/stream/io/WebSocketJsonRpcClient.java @@ -0,0 +1,52 @@ +package stream.io; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.googlecode.jsonrpc4j.JsonRpcClient; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicLong; + +public class WebSocketJsonRpcClient extends WebSocketClient { + private static final Logger logger = LoggerFactory.getLogger(WebSocketJsonRpcClient.class); + private final ObjectMapper mapper = new ObjectMapper(); + private final AtomicLong nextId = new AtomicLong(1); + private final JsonRpcClient client = new JsonRpcClient(mapper); + + public WebSocketJsonRpcClient(URI serverUri) { + super(serverUri); + } + + public String sendRpc(String method, Object params) { + if (!this.isOpen()) + this.connect(); + String requestId = "rpc-"+nextId.getAndIncrement(); + var message = client.createRequest(method, params, requestId).toString(); + this.send(message); + return requestId; + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + logger.info("Connection opened: {}", serverHandshake); + } + + @Override + public void onMessage(String message) { + logger.info("Received message: {}", message); + } + + @Override + public void onClose(int i, String s, boolean b) { + logger.info("Connection closed: {} {} {}", i, s, b); + } + + @Override + public void onError(Exception e) { + logger.error("Connection error", e); + } + +} diff --git a/src/main/java/stream/source/newheads/NewHeadsSource.java b/src/main/java/stream/source/newheads/NewHeadsSource.java new file mode 100644 index 0000000..3652deb --- /dev/null +++ b/src/main/java/stream/source/newheads/NewHeadsSource.java @@ -0,0 +1,195 @@ +package stream.source.newheads; + +import org.apache.flink.api.connector.source.*; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.Block; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; + +/** + * A Flink Source that emits blocks from the Ethereum blockchain by subscribing to newheads events. + * + * @param The specific Block subclass to emit + */ +public class NewHeadsSource implements Source { + + private static final Logger LOG = LoggerFactory.getLogger(NewHeadsSource.class); + + private final URI websocketUri; + private final Class blockClass; + + /** + * Creates a new NewHeadsSource. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param blockClass The class of the Block subclass to emit + */ + public NewHeadsSource(URI websocketUri, Class blockClass) { + this.websocketUri = websocketUri; + this.blockClass = blockClass; + } + + @Override + public Boundedness getBoundedness() { + // This source is unbounded as it continuously receives new blocks + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + return new NewHeadsSourceReader<>(websocketUri, blockClass, readerContext); + } + + @Override + public SplitEnumerator createEnumerator(SplitEnumeratorContext enumContext) { + return new NewHeadsSplitEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, NewHeadsEnumeratorState state) { + return new NewHeadsSplitEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new NewHeadsSplitSerializer(); + } + + // This method is not needed in Flink 2.0.0 + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new NewHeadsEnumeratorStateSerializer(); + } + + /** + * A simple split for the NewHeadsSource. Since we're just subscribing to a stream of events, + * we don't need to partition the source, so we use a single dummy split. + */ + public static class NewHeadsSplit implements SourceSplit { + private static final String SPLIT_ID = "newheads-split"; + + @Override + public String splitId() { + return SPLIT_ID; + } + } + + /** + * A simple state for the NewHeadsSource enumerator. Since we're just subscribing to a stream of events, + * we don't need to track any state, so this is just a placeholder. + */ + public static class NewHeadsEnumeratorState { + // No state needed + } + + /** + * A simple serializer for NewHeadsSplit. + */ + private static class NewHeadsSplitSerializer implements SimpleVersionedSerializer { + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(NewHeadsSplit split) { + // Since our split is just a dummy, we can return an empty byte array + return new byte[0]; + } + + @Override + public NewHeadsSplit deserialize(int version, byte[] serialized) { + // Since our split is just a dummy, we can create a new instance + return new NewHeadsSplit(); + } + } + + /** + * A simple serializer for NewHeadsEnumeratorState. + */ + private static class NewHeadsEnumeratorStateSerializer implements SimpleVersionedSerializer { + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(NewHeadsEnumeratorState state) { + // Since our state is just a dummy, we can return an empty byte array + return new byte[0]; + } + + @Override + public NewHeadsEnumeratorState deserialize(int version, byte[] serialized) { + // Since our state is just a dummy, we can create a new instance + return new NewHeadsEnumeratorState(); + } + } + + /** + * A simple enumerator for the NewHeadsSource. Since we're just subscribing to a stream of events, + * we don't need to partition the source, so we just assign a single dummy split to the first reader. + */ + private static class NewHeadsSplitEnumerator implements SplitEnumerator { + private final SplitEnumeratorContext context; + private boolean splitAssigned = false; + + public NewHeadsSplitEnumerator(SplitEnumeratorContext context) { + this.context = context; + } + + @Override + public void start() { + // If we have any readers, assign the split + if (context.registeredReaders().size() > 0) { + assignSplit(); + } + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + // Ignore split requests, we assign splits proactively + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + // If a reader failed and we get splits back, we'll reassign them when a new reader registers + splitAssigned = false; + } + + @Override + public void addReader(int subtaskId) { + // When a new reader registers, assign the split if it hasn't been assigned yet + if (!splitAssigned) { + assignSplit(); + } + } + + private void assignSplit() { + if (!context.registeredReaders().isEmpty()) { + // Assign the split to the first reader + int firstReader = context.registeredReaders().keySet().iterator().next(); + context.assignSplit(new NewHeadsSplit(), firstReader); + splitAssigned = true; + LOG.info("Assigned NewHeadsSplit to reader {}", firstReader); + } + } + + @Override + public NewHeadsEnumeratorState snapshotState(long checkpointId) throws Exception { + return new NewHeadsEnumeratorState(); + } + + @Override + public void close() throws IOException { + // No resources to clean up + } + } +} diff --git a/src/main/java/stream/source/newheads/NewHeadsSourceFactory.java b/src/main/java/stream/source/newheads/NewHeadsSourceFactory.java new file mode 100644 index 0000000..0d02b5f --- /dev/null +++ b/src/main/java/stream/source/newheads/NewHeadsSourceFactory.java @@ -0,0 +1,38 @@ +package stream.source.newheads; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.connector.source.Source; +import stream.dto.Block; + +import java.net.URI; + +/** + * A factory class for creating NewHeadsSource and NewHeadsSubscriber instances. + */ +public class NewHeadsSourceFactory { + + /** + * Creates a new NewHeadsSubscriber for the given websocket URI and block class. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param blockClass The class of the Block subclass to emit + * @param The specific Block subclass to emit + * @return A new NewHeadsSubscriber + */ + public static NewHeadsSubscriber createSubscriber(URI websocketUri, Class blockClass) { + return new NewHeadsSubscriber<>(websocketUri, blockClass, new ObjectMapper()); + } + + /** + * Creates a new NewHeadsSource for the given websocket URI and block class. + * This source will emit blocks from the Ethereum blockchain by subscribing to newheads events. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param blockClass The class of the Block subclass to emit + * @param The specific Block subclass to emit + * @return A new NewHeadsSource + */ + public static Source createSource(URI websocketUri, Class blockClass) { + return new NewHeadsSource<>(websocketUri, blockClass); + } +} diff --git a/src/main/java/stream/source/newheads/NewHeadsSourceReader.java b/src/main/java/stream/source/newheads/NewHeadsSourceReader.java new file mode 100644 index 0000000..01ead50 --- /dev/null +++ b/src/main/java/stream/source/newheads/NewHeadsSourceReader.java @@ -0,0 +1,125 @@ +package stream.source.newheads; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.Block; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * A Flink SourceReader that reads blocks from the Ethereum blockchain by subscribing to newheads events. + * + * @param The specific Block subclass to emit + */ +public class NewHeadsSourceReader implements SourceReader { + + private static final Logger LOG = LoggerFactory.getLogger(NewHeadsSourceReader.class); + + private final URI websocketUri; + private final Class blockClass; + private final List assignedSplits; + + private NewHeadsSubscriber subscriber; + + /** + * Creates a new NewHeadsSourceReader. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param blockClass The class of the Block subclass to emit + * @param context The source reader context + */ + public NewHeadsSourceReader(URI websocketUri, Class blockClass, @SuppressWarnings("unused") SourceReaderContext context) { + this.websocketUri = websocketUri; + this.blockClass = blockClass; + this.assignedSplits = new ArrayList<>(); + } + + @Override + public void start() { + // We don't start the subscriber here, we wait until we get a split + } + + @SuppressWarnings("RedundantThrows") + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + // If we haven't been assigned a split yet, we can't do anything + if (assignedSplits.isEmpty()) { + return InputStatus.NOTHING_AVAILABLE; + } + + // If we haven't created the subscriber yet, create it now + if (subscriber == null) { + try { + subscriber = new NewHeadsSubscriber<>(websocketUri, blockClass); + subscriber.connect(); + LOG.info("Connected to Ethereum node at {}", websocketUri); + } catch (Exception e) { + LOG.error("Error connecting to Ethereum node", e); + return InputStatus.NOTHING_AVAILABLE; + } + } + + // Try to get the next block + T block = subscriber.getNextBlockNonBlocking(); + if (block != null) { + // We got a block, emit it + output.collect(block); + LOG.debug("Emitted block: {}", block); + return InputStatus.MORE_AVAILABLE; + } else { + // No block available right now + return InputStatus.NOTHING_AVAILABLE; + } + } + + @Override + public CompletableFuture isAvailable() { + // If we have a subscriber and it's running, we might get more data + if (subscriber != null && subscriber.isRunning()) { + return CompletableFuture.completedFuture(null); + } else { + return CompletableFuture.supplyAsync(() -> { + try { + // Wait a bit before checking again + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + } + } + + @Override + public List snapshotState(long checkpointId) { + // We don't need to snapshot any state, just return the assigned splits + return Collections.unmodifiableList(assignedSplits); + } + + @Override + public void addSplits(List splits) { + LOG.info("Adding {} splits", splits.size()); + assignedSplits.addAll(splits); + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("No more splits will be assigned"); + } + + @Override + public void close() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + } +} \ No newline at end of file diff --git a/src/main/java/stream/source/newheads/NewHeadsSubscriber.java b/src/main/java/stream/source/newheads/NewHeadsSubscriber.java new file mode 100644 index 0000000..9f3997f --- /dev/null +++ b/src/main/java/stream/source/newheads/NewHeadsSubscriber.java @@ -0,0 +1,127 @@ +package stream.source.newheads; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import stream.dto.Block; +import stream.io.EthereumWebSocketClient; +import com.fasterxml.jackson.databind.DeserializationFeature; + +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A utility class that subscribes to Ethereum newheads events and provides a method to get the latest block. + * This class is not a Flink source itself, but can be used by a custom Flink source implementation. + * + * @param The specific Block subclass to emit + */ +public class NewHeadsSubscriber implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(NewHeadsSubscriber.class); + + private final URI websocketUri; + private final Class blockClass; + private final BlockingQueue blockQueue; + private final AtomicBoolean running; + private final ObjectMapper mapper; + + private EthereumWebSocketClient client; + + /** + * Creates a new NewHeadsSubscriber. + * + * @param websocketUri The URI of the Ethereum websocket endpoint + * @param blockClass The class of the Block subclass to emit + */ + public NewHeadsSubscriber(URI websocketUri, Class blockClass) { + this(websocketUri, blockClass, + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)); + } + + public NewHeadsSubscriber(URI websocketUri, Class blockClass, ObjectMapper mapper) { + this.websocketUri = websocketUri; + this.blockClass = blockClass; + this.blockQueue = new LinkedBlockingQueue<>(); + this.running = new AtomicBoolean(true); + this.mapper = mapper; + } + + /** + * Connects to the Ethereum node and subscribes to newheads events. + * + * @throws Exception If an error occurs while connecting or subscribing + */ + public void connect() throws Exception { + // Create and connect the client + client = new EthereumWebSocketClient(websocketUri); + client.connectBlocking(); + + // Subscribe to newheads + client.subscribeNewHeads(this::processNewHead); + + LOG.info("Subscribed to newheads events from {}", websocketUri); + } + + /** + * Processes a new block header received from the Ethereum node. + * + * @param jsonNode The JSON representation of the block header + */ + private void processNewHead(JsonNode jsonNode) { + try { + // Convert the JSON to the specified Block subclass + T block = mapper.treeToValue(jsonNode, blockClass); + + // Add the block to the queue + blockQueue.add(block); + + LOG.debug("Received new block: {}", block); + } catch (Exception e) { + LOG.error("Error processing new block", e); + } + } + + /** + * Gets the next block from the queue, blocking if necessary until a block is available. + * + * @return The next block + * @throws InterruptedException If the thread is interrupted while waiting + */ + public T getNextBlock() throws InterruptedException { + return blockQueue.take(); + } + + /** + * Gets the next block from the queue, returning null if no block is available. + * + * @return The next block, or null if no block is available + */ + public T getNextBlockNonBlocking() { + return blockQueue.poll(); + } + + /** + * Checks if the subscriber is running. + * + * @return True if the subscriber is running, false otherwise + */ + public boolean isRunning() { + return running.get(); + } + + @Override + public void close() throws Exception { + // Set running to false to signal that we're done + running.set(false); + + // Close the client if it exists + if (client != null) { + client.close(); + } + } +} \ No newline at end of file diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3546643 --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n