Swap object stream with verified amount in and amount out
This commit is contained in:
@@ -24,15 +24,21 @@ import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
|
||||
import org.apache.flink.util.ParameterTool;
|
||||
import org.apache.flink.util.Collector;
|
||||
import stream.dto.*;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import stream.io.PoolElaborator;
|
||||
import stream.io.TokenElaborator;
|
||||
import stream.io.SwapEnricher;
|
||||
import stream.source.eventlog.EventLogSourceFactory;
|
||||
import stream.source.newheads.NewHeadsSourceFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
@@ -87,6 +93,18 @@ public class DataStreamJob {
|
||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||
|
||||
// Log raw swap events
|
||||
swapStream.map(swapLog -> {
|
||||
log.info("RAW SWAP EVENT - Pool: {}, Sender: {}, Recipient: {}, Amount0: {}, Amount1: {}, SqrtPriceX96: {}, Liquidity: {}, Tick: {}, TxHash: {}",
|
||||
swapLog.address, swapLog.sender, swapLog.recipient,
|
||||
swapLog.amount0, swapLog.amount1, swapLog.sqrtPriceX96,
|
||||
swapLog.liquidity, swapLog.tick, swapLog.transactionHash);
|
||||
return swapLog;
|
||||
});
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||
swapStream,
|
||||
@@ -113,35 +131,74 @@ public class DataStreamJob {
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// Print comprehensive swap event data with token metadata
|
||||
elaboratedSwapStream
|
||||
.map(event -> {
|
||||
// Connect swap events with token metadata to create FullyElaboratedSwap
|
||||
DataStream<FullyElaboratedSwap> fullyElaboratedSwaps = elaboratedSwapStream
|
||||
.connect(elaboratedTokens)
|
||||
.flatMap(new RichCoFlatMapFunction<ElaboratedSwapEvent, Token, FullyElaboratedSwap>() {
|
||||
private final Map<String, Token> tokenCache = new HashMap<>();
|
||||
private final Map<String, ElaboratedSwapEvent> pendingSwaps = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void flatMap1(ElaboratedSwapEvent event, Collector<FullyElaboratedSwap> out) throws Exception {
|
||||
String token0Addr = event.getToken0Address().toLowerCase();
|
||||
String token1Addr = event.getToken1Address().toLowerCase();
|
||||
|
||||
Token token0 = tokenCache.get(token0Addr);
|
||||
Token token1 = tokenCache.get(token1Addr);
|
||||
|
||||
if (token0 != null && token1 != null) {
|
||||
// We have both tokens, create FullyElaboratedSwap
|
||||
SwapEventLog swapLog = event.getSwapEvent();
|
||||
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1);
|
||||
out.collect(fullSwap);
|
||||
} else {
|
||||
// Cache the swap event until we get both tokens
|
||||
pendingSwaps.put(event.getSwapEvent().transactionHash, event);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap2(Token token, Collector<FullyElaboratedSwap> out) throws Exception {
|
||||
// Cache the token
|
||||
tokenCache.put(token.address.toLowerCase(), token);
|
||||
|
||||
// Check if any pending swaps can now be completed
|
||||
Iterator<Map.Entry<String, ElaboratedSwapEvent>> iterator = pendingSwaps.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<String, ElaboratedSwapEvent> entry = iterator.next();
|
||||
ElaboratedSwapEvent event = entry.getValue();
|
||||
|
||||
String token0Addr = event.getToken0Address().toLowerCase();
|
||||
String token1Addr = event.getToken1Address().toLowerCase();
|
||||
|
||||
Token token0 = tokenCache.get(token0Addr);
|
||||
Token token1 = tokenCache.get(token1Addr);
|
||||
|
||||
if (token0 != null && token1 != null) {
|
||||
SwapEventLog swapLog = event.getSwapEvent();
|
||||
FullyElaboratedSwap fullSwap = new FullyElaboratedSwap(event, token0, token1);
|
||||
out.collect(fullSwap);
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Apply SwapEnricher to create final Swap objects with calculated prices
|
||||
DataStream<Swap> enrichedSwaps = fullyElaboratedSwaps
|
||||
.map(new SwapEnricher());
|
||||
|
||||
// Print the final enriched swap objects
|
||||
enrichedSwaps
|
||||
.map(swap -> {
|
||||
try {
|
||||
String json = mapper.writeValueAsString(event);
|
||||
log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}",
|
||||
event.getSwapEvent().address,
|
||||
event.getSwapEvent().blockNumber,
|
||||
event.getSwapEvent().transactionHash,
|
||||
event.getToken0Address(),
|
||||
event.getToken1Address(),
|
||||
event.getSwapEvent().amount0,
|
||||
event.getSwapEvent().amount1,
|
||||
event.getSwapEvent().tick);
|
||||
String json = mapper.writeValueAsString(swap);
|
||||
return json;
|
||||
} catch (Exception e) {
|
||||
return "Error converting elaborated swap event to JSON: " + e.getMessage();
|
||||
return "Error converting enriched swap to JSON: " + e.getMessage();
|
||||
}
|
||||
})
|
||||
.print("Swap Event: ");
|
||||
|
||||
// Print token metadata when available
|
||||
elaboratedTokens
|
||||
.map(token -> {
|
||||
log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}",
|
||||
token.address, token.name, token.symbol, token.decimals);
|
||||
return token.toString();
|
||||
})
|
||||
.print("Token: ");
|
||||
.print("Enriched Swap: ");
|
||||
|
||||
env.execute("Ethereum Block Stream");
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ import org.web3j.abi.datatypes.Bool;
|
||||
import org.web3j.abi.datatypes.Type;
|
||||
import org.web3j.abi.datatypes.Utf8String;
|
||||
import org.web3j.abi.datatypes.generated.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
@@ -20,6 +22,8 @@ import static stream.io.EthUtils.keccak;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class ElaboratedEventType<T extends EventLog> extends EventType {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ElaboratedEventType.class);
|
||||
|
||||
public transient String name;
|
||||
public transient List<String> paramNames;
|
||||
@@ -63,7 +67,9 @@ public class ElaboratedEventType<T extends EventLog> extends EventType {
|
||||
int dataValuesIndex = 0;
|
||||
int topicsIndex = 1; // the first topic is the event signature
|
||||
List<Object> args = new ArrayList<>();
|
||||
for (TypeReference<Type> paramType : this.paramTypes) {
|
||||
for (int i = 0; i < this.paramTypes.size(); i++) {
|
||||
TypeReference<Type> paramType = this.paramTypes.get(i);
|
||||
String paramName = this.paramNames.get(i);
|
||||
Object value;
|
||||
if (paramType.isIndexed()) {
|
||||
String encoded = topics.get(topicsIndex++);
|
||||
|
||||
@@ -8,23 +8,35 @@ public class Swap extends ChainId {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
Long time; // milliseconds
|
||||
Exchange exchange;
|
||||
String pool; // address
|
||||
String takerAsset; // token address
|
||||
String makerAsset; // token address
|
||||
BigDecimal amount; // positive means the taker bought; negative means the taker sold.
|
||||
BigDecimal price;
|
||||
public Long time; // milliseconds
|
||||
public Exchange exchange;
|
||||
public String pool; // address
|
||||
public String takerAsset; // token address
|
||||
public String makerAsset; // token address
|
||||
public BigDecimal amountIn; // positive means the taker bought; negative means the taker sold.
|
||||
public BigDecimal amountOut; // positive means the taker bought; negative means the taker sold.
|
||||
public BigDecimal price;
|
||||
|
||||
public Swap(int chainId, Long time, Exchange exchange, String pool, String takerAsset, String makerAsset,
|
||||
BigDecimal amount, BigDecimal price) {
|
||||
BigDecimal amountIn, BigDecimal amountOut, BigDecimal price) {
|
||||
super(chainId);
|
||||
this.time = time;
|
||||
this.exchange = exchange;
|
||||
this.pool = pool;
|
||||
this.takerAsset = takerAsset;
|
||||
this.makerAsset = makerAsset;
|
||||
this.amount = amount;
|
||||
this.price = price;
|
||||
this.amountIn = amountIn;
|
||||
this.amountOut = amountOut;
|
||||
this.price = price; // Total trade value
|
||||
}
|
||||
|
||||
// Getter methods
|
||||
public Long getTime() { return time; }
|
||||
public Exchange getExchange() { return exchange; }
|
||||
public String getPool() { return pool; }
|
||||
public String getTakerAsset() { return takerAsset; }
|
||||
public String getMakerAsset() { return makerAsset; }
|
||||
public BigDecimal getAmountIn() { return amountIn; }
|
||||
public BigDecimal getAmountOut() { return amountOut; }
|
||||
public BigDecimal getPrice() { return price; }
|
||||
}
|
||||
|
||||
@@ -29,13 +29,9 @@ public class SwapEventLog extends EventLog implements Serializable {
|
||||
|
||||
public String sender;
|
||||
public String recipient;
|
||||
@BigInt
|
||||
public BigInteger amount0;
|
||||
@BigInt
|
||||
public BigInteger amount1;
|
||||
@BigInt
|
||||
public BigInteger sqrtPriceX96;
|
||||
@BigInt
|
||||
public BigInteger liquidity;
|
||||
public int tick;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user