BlockElaborator

This commit is contained in:
tim
2025-10-06 12:33:45 -04:00
parent 3a64f5630c
commit 6829053d94
3 changed files with 66 additions and 0 deletions

View File

@@ -12,4 +12,9 @@ public class BlockHash extends BlockId {
public Object getId() {
return this.hash;
}
@Override
public String toString() {
return this.hash;
}
}

View File

@@ -12,4 +12,9 @@ public class BlockNumber extends BlockId {
public Object getId() {
return this.number;
}
@Override
public String toString() {
return String.valueOf(this.number);
}
}

View File

@@ -0,0 +1,56 @@
package stream.io;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.tx.ReadonlyTransactionManager;
import org.web3j.tx.gas.DefaultGasProvider;
import stream.dto.BlockHash;
import stream.dto.BlockId;
import stream.dto.BlockNumber;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class BlockElaborator extends RichAsyncFunction<BlockId, EthBlock> {
private static final Logger log = LoggerFactory.getLogger(BlockElaborator.class);
private transient Web3j w3;
@Override
public void open(OpenContext openContext) throws Exception {
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
w3 = Web3Client.get(params);
gasProvider = new DefaultGasProvider();
transactionManager = new ReadonlyTransactionManager(w3, "0x1234000000000000000000000000000000001234");
}
@Override
public void asyncInvoke(BlockId blockId, final ResultFuture<EthBlock> resultFuture) {
CompletableFuture.supplyAsync(() -> {
try {
return getBlock(blockId);
} catch (Exception e) {
log.error("Failed to get block {} on chain {}", blockId, e);
throw new RuntimeException("Error processing token: " + address, e);
}
}).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));
}
private EthBlock getBlock(BlockId id) {
try {
return (id instanceof BlockNumber) ?
w3.ethGetBlockByNumber(new DefaultBlockParameterNumber(((BlockNumber) id).number), false).sendAsync().get() :
w3.ethGetBlockByHash(((BlockHash) id).hash, false).sendAsync().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}