From 6829053d943574fce48c2feabd3b994097110afb Mon Sep 17 00:00:00 2001 From: tim Date: Mon, 6 Oct 2025 12:33:45 -0400 Subject: [PATCH] BlockElaborator --- src/main/java/stream/dto/BlockHash.java | 5 ++ src/main/java/stream/dto/BlockNumber.java | 5 ++ src/main/java/stream/io/BlockElaborator.java | 56 ++++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 src/main/java/stream/io/BlockElaborator.java diff --git a/src/main/java/stream/dto/BlockHash.java b/src/main/java/stream/dto/BlockHash.java index 443ec0b..ae8df4c 100644 --- a/src/main/java/stream/dto/BlockHash.java +++ b/src/main/java/stream/dto/BlockHash.java @@ -12,4 +12,9 @@ public class BlockHash extends BlockId { public Object getId() { return this.hash; } + + @Override + public String toString() { + return this.hash; + } } diff --git a/src/main/java/stream/dto/BlockNumber.java b/src/main/java/stream/dto/BlockNumber.java index 6aa9b19..679e125 100644 --- a/src/main/java/stream/dto/BlockNumber.java +++ b/src/main/java/stream/dto/BlockNumber.java @@ -12,4 +12,9 @@ public class BlockNumber extends BlockId { public Object getId() { return this.number; } + + @Override + public String toString() { + return String.valueOf(this.number); + } } diff --git a/src/main/java/stream/io/BlockElaborator.java b/src/main/java/stream/io/BlockElaborator.java new file mode 100644 index 0000000..a3c5578 --- /dev/null +++ b/src/main/java/stream/io/BlockElaborator.java @@ -0,0 +1,56 @@ +package stream.io; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.web3j.protocol.Web3j; +import org.web3j.protocol.core.DefaultBlockParameterNumber; +import org.web3j.protocol.core.methods.response.EthBlock; +import org.web3j.tx.ReadonlyTransactionManager; +import org.web3j.tx.gas.DefaultGasProvider; +import stream.dto.BlockHash; +import stream.dto.BlockId; +import stream.dto.BlockNumber; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + + +public class BlockElaborator extends RichAsyncFunction { + private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class); + private transient Web3j w3; + + @Override + public void open(OpenContext openContext) throws Exception { + Map params = getRuntimeContext().getGlobalJobParameters(); + w3 = Web3Client.get(params); + gasProvider = new DefaultGasProvider(); + transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234"); + } + + @Override + public void asyncInvoke(BlockId blockId, final ResultFuture resultFuture) { + CompletableFuture.supplyAsync(() -> { + try { + return getBlock(blockId); + } catch (Exception e) { + log.error("Failed to get block {} on chain {}", blockId, e); + throw new RuntimeException("Error processing token: " + address, e); + } + }).thenAccept(result -> resultFuture.complete(Collections.singleton(result))); + } + + private EthBlock getBlock(BlockId id) { + try { + return (id instanceof BlockNumber) ? + w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() : + w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +}