adding renamed elaborators
This commit is contained in:
59
src/main/java/stream/dto/SwapEventWithTokenIds.java
Normal file
59
src/main/java/stream/dto/SwapEventWithTokenIds.java
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.math.BigInteger;
|
||||||
|
|
||||||
|
public class SwapEventWithTokenIds implements Serializable {
|
||||||
|
private SwapEventLog swapEvent;
|
||||||
|
private String token0Address;
|
||||||
|
private String token1Address;
|
||||||
|
private BigInteger timestamp;
|
||||||
|
|
||||||
|
public SwapEventWithTokenIds() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.token0Address = token0Address;
|
||||||
|
this.token1Address = token1Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventWithTokenIds(SwapEventLog swapEvent, String token0Address, String token1Address, BigInteger timestamp) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.token0Address = token0Address;
|
||||||
|
this.token1Address = token1Address;
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventLog getSwapEvent() {
|
||||||
|
return swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSwapEvent(SwapEventLog swapEvent) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getToken0Address() {
|
||||||
|
return token0Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setToken0Address(String token0Address) {
|
||||||
|
this.token0Address = token0Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getToken1Address() {
|
||||||
|
return token1Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setToken1Address(String token1Address) {
|
||||||
|
this.token1Address = token1Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BigInteger getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimestamp(BigInteger timestamp) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
}
|
||||||
42
src/main/java/stream/dto/SwapEventWithTokenMetadata.java
Normal file
42
src/main/java/stream/dto/SwapEventWithTokenMetadata.java
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class SwapEventWithTokenMetadata implements Serializable {
|
||||||
|
private SwapEventWithTokenIds swapEvent;
|
||||||
|
private Token token0;
|
||||||
|
private Token token1;
|
||||||
|
|
||||||
|
public SwapEventWithTokenMetadata() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventWithTokenMetadata(SwapEventWithTokenIds swapEvent, Token token0, Token token1) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.token0 = token0;
|
||||||
|
this.token1 = token1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventWithTokenIds getSwapEvent() {
|
||||||
|
return swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSwapEvent(SwapEventWithTokenIds swapEvent) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Token getToken0() {
|
||||||
|
return token0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setToken0(Token token0) {
|
||||||
|
this.token0 = token0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Token getToken1() {
|
||||||
|
return token1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setToken1(Token token1) {
|
||||||
|
this.token1 = token1;
|
||||||
|
}
|
||||||
|
}
|
||||||
24
src/main/java/stream/dto/SwapWithTimestamp.java
Normal file
24
src/main/java/stream/dto/SwapWithTimestamp.java
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.math.BigInteger;
|
||||||
|
|
||||||
|
public class SwapWithTimestamp implements Serializable {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
private final SwapEventLog swapEvent;
|
||||||
|
private final BigInteger timestamp;
|
||||||
|
|
||||||
|
public SwapWithTimestamp(SwapEventLog swapEvent, BigInteger timestamp) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventLog getSwapEvent() {
|
||||||
|
return swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BigInteger getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
}
|
||||||
80
src/main/java/stream/io/BlockTimestampElaborator.java
Normal file
80
src/main/java/stream/io/BlockTimestampElaborator.java
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.web3j.protocol.core.methods.response.EthBlock;
|
||||||
|
import stream.dto.BlockHash;
|
||||||
|
import stream.dto.SwapEventLog;
|
||||||
|
import stream.dto.SwapWithTimestamp;
|
||||||
|
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class BlockTimestampElaborator extends RichAsyncFunction<SwapEventLog, SwapWithTimestamp> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(BlockTimestampElaborator.class);
|
||||||
|
private transient BlockElaborator blockElaborator;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
blockElaborator = new BlockElaborator();
|
||||||
|
blockElaborator.setRuntimeContext(getRuntimeContext());
|
||||||
|
blockElaborator.open(openContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(SwapEventLog swapEvent, ResultFuture<SwapWithTimestamp> resultFuture) {
|
||||||
|
try {
|
||||||
|
BlockHash blockHash = new BlockHash();
|
||||||
|
blockHash.hash = swapEvent.blockHash;
|
||||||
|
|
||||||
|
// Create a custom ResultFuture to handle the block elaboration result
|
||||||
|
ResultFuture<EthBlock> blockResultFuture = new ResultFuture<EthBlock>() {
|
||||||
|
@Override
|
||||||
|
public void complete(java.util.Collection<EthBlock> result) {
|
||||||
|
try {
|
||||||
|
if (!result.isEmpty()) {
|
||||||
|
EthBlock ethBlock = result.iterator().next();
|
||||||
|
BigInteger timestamp = ethBlock.getBlock().getTimestamp();
|
||||||
|
SwapWithTimestamp swapWithTimestamp = new SwapWithTimestamp(swapEvent, timestamp);
|
||||||
|
resultFuture.complete(Collections.singleton(swapWithTimestamp));
|
||||||
|
} else {
|
||||||
|
log.error("No block found for block hash {}", swapEvent.blockHash);
|
||||||
|
resultFuture.completeExceptionally(new RuntimeException("No block found"));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to get timestamp for block {} of swap in tx {}",
|
||||||
|
swapEvent.blockHash, swapEvent.transactionHash, e);
|
||||||
|
resultFuture.completeExceptionally(new RuntimeException("Error getting block timestamp", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void complete(org.apache.flink.streaming.api.functions.async.CollectionSupplier<EthBlock> collectionSupplier) {
|
||||||
|
try {
|
||||||
|
complete(collectionSupplier.get());
|
||||||
|
} catch (Exception e) {
|
||||||
|
completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completeExceptionally(Throwable error) {
|
||||||
|
log.error("Failed to get block {} for swap in tx {}",
|
||||||
|
swapEvent.blockHash, swapEvent.transactionHash, error);
|
||||||
|
resultFuture.completeExceptionally(error);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Invoke the block elaborator asynchronously
|
||||||
|
blockElaborator.asyncInvoke(blockHash, blockResultFuture);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error in BlockTimestampElaborator for swap in tx {}",
|
||||||
|
swapEvent.transactionHash, e);
|
||||||
|
resultFuture.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
81
src/main/java/stream/io/PoolTokenIdElaborator.java
Normal file
81
src/main/java/stream/io/PoolTokenIdElaborator.java
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import stream.dto.SwapWithTimestamp;
|
||||||
|
import stream.dto.SwapEventWithTokenIds;
|
||||||
|
import stream.contract.UniswapV3Pool;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import org.web3j.protocol.Web3j;
|
||||||
|
import org.web3j.protocol.http.HttpService;
|
||||||
|
import org.web3j.crypto.Credentials;
|
||||||
|
import org.web3j.tx.gas.DefaultGasProvider;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.Collections;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class PoolTokenIdElaborator extends RichAsyncFunction<SwapWithTimestamp, SwapEventWithTokenIds> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PoolTokenIdElaborator.class);
|
||||||
|
private transient Web3j web3j;
|
||||||
|
private transient Credentials credentials;
|
||||||
|
private transient DefaultGasProvider gasProvider;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
super.open(openContext);
|
||||||
|
|
||||||
|
// Get RPC URL from job parameters
|
||||||
|
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||||
|
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||||
|
// TODO: Get from configuration if needed
|
||||||
|
|
||||||
|
// Initialize Web3j
|
||||||
|
this.web3j = Web3j.build(new HttpService(rpcUrl));
|
||||||
|
|
||||||
|
// Dummy credentials for read-only operations
|
||||||
|
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
|
||||||
|
|
||||||
|
// Default gas provider
|
||||||
|
this.gasProvider = new DefaultGasProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(SwapWithTimestamp swapWithTimestamp, ResultFuture<SwapEventWithTokenIds> resultFuture) throws Exception {
|
||||||
|
CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
// Load the pool contract
|
||||||
|
UniswapV3Pool pool = UniswapV3Pool.load(
|
||||||
|
swapWithTimestamp.getSwapEvent().getAddress(), // Pool address from the event
|
||||||
|
web3j,
|
||||||
|
credentials,
|
||||||
|
gasProvider
|
||||||
|
);
|
||||||
|
|
||||||
|
// Get token addresses
|
||||||
|
String token0 = pool.token0().send();
|
||||||
|
String token1 = pool.token1().send();
|
||||||
|
|
||||||
|
// Create enriched event with timestamp
|
||||||
|
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), token0, token1, swapWithTimestamp.getTimestamp());
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error fetching pool tokens for swap in pool {}",
|
||||||
|
swapWithTimestamp.getSwapEvent().getAddress(), e);
|
||||||
|
// Return original without enrichment but with timestamp
|
||||||
|
return new SwapEventWithTokenIds(swapWithTimestamp.getSwapEvent(), null, null, swapWithTimestamp.getTimestamp());
|
||||||
|
}
|
||||||
|
}).thenAccept(enriched -> {
|
||||||
|
resultFuture.complete(Collections.singletonList(enriched));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
if (web3j != null) {
|
||||||
|
web3j.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
94
src/main/java/stream/io/SwapElaborator.java
Normal file
94
src/main/java/stream/io/SwapElaborator.java
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
|
||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import stream.dto.*;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.math.MathContext;
|
||||||
|
import java.math.RoundingMode;
|
||||||
|
|
||||||
|
public class SwapElaborator extends RichMapFunction<SwapEventWithTokenMetadata, Swap> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SwapElaborator.class);
|
||||||
|
private static final MathContext MATH_CONTEXT = new MathContext(18, RoundingMode.HALF_UP);
|
||||||
|
private static final BigDecimal Q96 = new BigDecimal(2).pow(96);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Swap map(SwapEventWithTokenMetadata swapWithMetadata) throws Exception {
|
||||||
|
SwapEventWithTokenIds event = swapWithMetadata.getSwapEvent();
|
||||||
|
SwapEventLog swapLog = event.getSwapEvent();
|
||||||
|
Token token0 = swapWithMetadata.getToken0();
|
||||||
|
Token token1 = swapWithMetadata.getToken1();
|
||||||
|
|
||||||
|
// Use timestamp from block elaboration and set exchange as UNISWAP_V3
|
||||||
|
Long time = event.getTimestamp().longValue();
|
||||||
|
Exchange exchange = Exchange.UNISWAP_V3;
|
||||||
|
|
||||||
|
// Pool address
|
||||||
|
String pool = swapLog.address;
|
||||||
|
|
||||||
|
// Get sqrtPriceX96 and calculate actual price
|
||||||
|
BigDecimal sqrtPriceX96 = new BigDecimal(swapLog.sqrtPriceX96);
|
||||||
|
BigDecimal sqrtPrice = sqrtPriceX96.divide(Q96, MATH_CONTEXT);
|
||||||
|
BigDecimal price = sqrtPrice.multiply(sqrtPrice, MATH_CONTEXT);
|
||||||
|
|
||||||
|
// Adjust price for decimals (price is token1/token0)
|
||||||
|
int decimalDiff = token0.decimals - token1.decimals;
|
||||||
|
BigDecimal adjustedPrice;
|
||||||
|
if (decimalDiff >= 0) {
|
||||||
|
BigDecimal decimalAdjustment = new BigDecimal(10).pow(decimalDiff);
|
||||||
|
adjustedPrice = price.multiply(decimalAdjustment, MATH_CONTEXT);
|
||||||
|
} else {
|
||||||
|
BigDecimal decimalAdjustment = new BigDecimal(10).pow(-decimalDiff);
|
||||||
|
adjustedPrice = price.divide(decimalAdjustment, MATH_CONTEXT);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine which token is in and which is out based on amount signs
|
||||||
|
boolean isToken0In = swapLog.amount0.compareTo(BigInteger.ZERO) < 0;
|
||||||
|
|
||||||
|
String takerAsset;
|
||||||
|
String makerAsset;
|
||||||
|
BigDecimal amountIn;
|
||||||
|
BigDecimal amountOut;
|
||||||
|
BigDecimal finalPrice;
|
||||||
|
|
||||||
|
if (isToken0In) {
|
||||||
|
// User is sending token0, receiving token1
|
||||||
|
takerAsset = event.getToken0Address();
|
||||||
|
makerAsset = event.getToken1Address();
|
||||||
|
|
||||||
|
// Convert amounts to human-readable format using decimals
|
||||||
|
amountIn = new BigDecimal(swapLog.amount0.abs())
|
||||||
|
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||||
|
amountOut = new BigDecimal(swapLog.amount1)
|
||||||
|
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
|
||||||
|
|
||||||
|
// Price is how much token1 you get per token0
|
||||||
|
finalPrice = adjustedPrice;
|
||||||
|
} else {
|
||||||
|
// User is sending token1, receiving token0
|
||||||
|
takerAsset = event.getToken1Address();
|
||||||
|
makerAsset = event.getToken0Address();
|
||||||
|
|
||||||
|
// Convert amounts to human-readable format using decimals
|
||||||
|
amountIn = new BigDecimal(swapLog.amount1.abs())
|
||||||
|
.divide(new BigDecimal(10).pow(token1.decimals), MATH_CONTEXT);
|
||||||
|
amountOut = new BigDecimal(swapLog.amount0)
|
||||||
|
.divide(new BigDecimal(10).pow(token0.decimals), MATH_CONTEXT);
|
||||||
|
|
||||||
|
// Price is how much token0 you get per token1 (inverse of adjustedPrice)
|
||||||
|
finalPrice = BigDecimal.ONE.divide(adjustedPrice, MATH_CONTEXT);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Enriched swap - Pool: {} {} -> {} Amount: {} -> {} Price: {}",
|
||||||
|
pool, token0.symbol + "/" + token1.symbol,
|
||||||
|
isToken0In ? token1.symbol : token0.symbol,
|
||||||
|
amountIn, amountOut, finalPrice);
|
||||||
|
|
||||||
|
// Pass both amountIn and amountOut to constructor with unit price
|
||||||
|
return new Swap(42161, time, exchange, pool, takerAsset, makerAsset, amountIn, amountOut, finalPrice);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user