Compare commits
2 Commits
7449b4b4b1
...
00ae425700
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
00ae425700 | ||
|
|
e58fdc0b06 |
4
.mvn/jvm.config
Normal file
4
.mvn/jvm.config
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
--add-opens java.base/java.lang=ALL-UNNAMED
|
||||||
|
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
|
||||||
|
--add-opens java.base/java.net=ALL-UNNAMED
|
||||||
|
--add-opens java.base/java.util=ALL-UNNAMED
|
||||||
8
eval "$(ssh-agent -s)"
Normal file
8
eval "$(ssh-agent -s)"
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
-----BEGIN OPENSSH PRIVATE KEY-----
|
||||||
|
b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABBr7BKqJe
|
||||||
|
BOIqEijWReA7+HAAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY
|
||||||
|
416Fg7f3KjRUaaQ5x44CAWLsZtz4AAAAoCvYBgMQ51N0rrqc1FGsatMFbY+DQxyu4KDSJM
|
||||||
|
dH0Gr4m/B+3jyxHgQ+RmwRpoGk80Ew4zN6RIrsVZf8chfCybfyxFd/WuxvBIf4f3SVsu8z
|
||||||
|
k1WZ50YQvG5eATRzrRNDlcDxyyyLeSjn7Au2wEYYVPgzqvO5MzH+/B5PWItaVdb7fGUQ1W
|
||||||
|
xPRd+ZdJgZ0pIEPmjU440qG06hgYFTS0ywNv0=
|
||||||
|
-----END OPENSSH PRIVATE KEY-----
|
||||||
1
eval "$(ssh-agent -s)".pub
Normal file
1
eval "$(ssh-agent -s)".pub
Normal file
@@ -0,0 +1 @@
|
|||||||
|
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY416Fg7f3KjRUaaQ5x44CAWLsZtz4 surbhi.jhavar@gmail.com
|
||||||
@@ -19,19 +19,19 @@
|
|||||||
package stream;
|
package stream;
|
||||||
|
|
||||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
|
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||||
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.util.ParameterTool;
|
import org.apache.flink.util.ParameterTool;
|
||||||
import stream.contract.ArbitrumOne;
|
import org.apache.flink.util.Collector;
|
||||||
import stream.dto.*;
|
import stream.dto.*;
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
import stream.io.PoolElaborator;
|
||||||
import stream.io.TokenElaborator;
|
import stream.io.TokenElaborator;
|
||||||
import stream.source.eventlog.EventLogSourceFactory;
|
import stream.source.eventlog.EventLogSourceFactory;
|
||||||
import stream.source.newheads.NewHeadsSourceFactory;
|
import stream.source.newheads.NewHeadsSourceFactory;
|
||||||
import stream.dto.AddressId;
|
|
||||||
import stream.dto.Token;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
@@ -74,13 +74,6 @@ public class DataStreamJob {
|
|||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
mapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||||
|
|
||||||
// Test token metadata markup
|
|
||||||
// https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/operators/asyncio/
|
|
||||||
DataStream<AddressId> usdcStream = env.fromData(new AddressId(42161, ArbitrumOne.ADDR_USDC));
|
|
||||||
SingleOutputStreamOperator<Token> elaboratedUsdcStream =
|
|
||||||
AsyncDataStream.unorderedWait(usdcStream, new TokenElaborator(), 10, TimeUnit.MINUTES);
|
|
||||||
// debug print
|
|
||||||
elaboratedUsdcStream.map(token -> {log.info("Token: {}",token); return token.toString();}).print();
|
|
||||||
|
|
||||||
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
DataStream<ArbitrumOneBlock> arbitrumHeads = env
|
||||||
.fromSource(
|
.fromSource(
|
||||||
@@ -94,68 +87,61 @@ public class DataStreamJob {
|
|||||||
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
DataStream<BurnEventLog> burnStream = getEventStream(env, webSocketUri, BurnEventLog.SIGNATURE, BurnEventLog.class, "Uniswap v3 Burn Events");
|
||||||
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
DataStream<SwapEventLog> swapStream = getEventStream(env, webSocketUri, SwapEventLog.SIGNATURE, SwapEventLog.class, "Uniswap v3 Swap Events");
|
||||||
|
|
||||||
// Map the blocks to pretty-printed JSON strings
|
|
||||||
/*
|
|
||||||
blockStream
|
|
||||||
.map(block -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(block);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting block to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("New Ethereum Block: ");
|
|
||||||
|
|
||||||
transferStream
|
SingleOutputStreamOperator<ElaboratedSwapEvent> elaboratedSwapStream = AsyncDataStream.unorderedWait(
|
||||||
.map(event -> {
|
swapStream,
|
||||||
try {
|
new PoolElaborator(),
|
||||||
return mapper.writeValueAsString(event);
|
30,
|
||||||
} catch (Exception e) {
|
TimeUnit.SECONDS
|
||||||
return "Error converting transfer event to JSON: " + e.getMessage();
|
);
|
||||||
|
|
||||||
|
// Extract token addresses and elaborate with metadata
|
||||||
|
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
|
||||||
|
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() {
|
||||||
|
@Override
|
||||||
|
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception {
|
||||||
|
collector.collect(new AddressId(42161, event.getToken0Address()));
|
||||||
|
collector.collect(new AddressId(42161, event.getToken1Address()));
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
.print("Transfer Event: ");
|
|
||||||
|
|
||||||
swapStream
|
// Elaborate tokens with metadata
|
||||||
|
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
|
||||||
|
tokenAddresses,
|
||||||
|
new TokenElaborator(),
|
||||||
|
120,
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
|
||||||
|
// Print comprehensive swap event data with token metadata
|
||||||
|
elaboratedSwapStream
|
||||||
.map(event -> {
|
.map(event -> {
|
||||||
try {
|
try {
|
||||||
return mapper.writeValueAsString(event);
|
String json = mapper.writeValueAsString(event);
|
||||||
|
log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}",
|
||||||
|
event.getSwapEvent().address,
|
||||||
|
event.getSwapEvent().blockNumber,
|
||||||
|
event.getSwapEvent().transactionHash,
|
||||||
|
event.getToken0Address(),
|
||||||
|
event.getToken1Address(),
|
||||||
|
event.getSwapEvent().amount0,
|
||||||
|
event.getSwapEvent().amount1,
|
||||||
|
event.getSwapEvent().tick);
|
||||||
|
return json;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return "Error converting swap event to JSON: " + e.getMessage();
|
return "Error converting elaborated swap event to JSON: " + e.getMessage();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.print("Swap Event: ");
|
.print("Swap Event: ");
|
||||||
*/
|
|
||||||
|
|
||||||
mintStream
|
// Print token metadata when available
|
||||||
.map(event -> {
|
elaboratedTokens
|
||||||
try {
|
.map(token -> {
|
||||||
return mapper.writeValueAsString(event);
|
log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}",
|
||||||
} catch (Exception e) {
|
token.address, token.name, token.symbol, token.decimals);
|
||||||
return "Error converting mint event to JSON: " + e.getMessage();
|
return token.toString();
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.print("Mint Event: ");
|
.print("Token: ");
|
||||||
|
|
||||||
burnStream
|
|
||||||
.map(event -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(event);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting burn event to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("Burn Event: ");
|
|
||||||
|
|
||||||
swapStream
|
|
||||||
.map(event -> {
|
|
||||||
try {
|
|
||||||
return mapper.writeValueAsString(event);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return "Error converting swap event to JSON: " + e.getMessage();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.print("Swap Event: ");
|
|
||||||
|
|
||||||
env.execute("Ethereum Block Stream");
|
env.execute("Ethereum Block Stream");
|
||||||
}
|
}
|
||||||
|
|||||||
42
src/main/java/stream/dto/ElaboratedSwapEvent.java
Normal file
42
src/main/java/stream/dto/ElaboratedSwapEvent.java
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package stream.dto;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class ElaboratedSwapEvent implements Serializable {
|
||||||
|
private SwapEventLog swapEvent;
|
||||||
|
private String token0Address;
|
||||||
|
private String token1Address;
|
||||||
|
|
||||||
|
public ElaboratedSwapEvent() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public ElaboratedSwapEvent(SwapEventLog swapEvent, String token0Address, String token1Address) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
this.token0Address = token0Address;
|
||||||
|
this.token1Address = token1Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SwapEventLog getSwapEvent() {
|
||||||
|
return swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSwapEvent(SwapEventLog swapEvent) {
|
||||||
|
this.swapEvent = swapEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getToken0Address() {
|
||||||
|
return token0Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setToken0Address(String token0Address) {
|
||||||
|
this.token0Address = token0Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getToken1Address() {
|
||||||
|
return token1Address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setToken1Address(String token1Address) {
|
||||||
|
this.token1Address = token1Address;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -39,4 +39,7 @@ public class SwapEventLog extends EventLog implements Serializable {
|
|||||||
public BigInteger liquidity;
|
public BigInteger liquidity;
|
||||||
public int tick;
|
public int tick;
|
||||||
|
|
||||||
|
public String getAddress() {
|
||||||
|
return this.address;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
80
src/main/java/stream/io/PoolElaborator.java
Normal file
80
src/main/java/stream/io/PoolElaborator.java
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package stream.io;
|
||||||
|
|
||||||
|
import stream.dto.SwapEventLog;
|
||||||
|
import stream.dto.ElaboratedSwapEvent;
|
||||||
|
import stream.contract.UniswapV3Pool;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.functions.OpenContext;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
|
||||||
|
import org.web3j.protocol.Web3j;
|
||||||
|
import org.web3j.protocol.http.HttpService;
|
||||||
|
import org.web3j.crypto.Credentials;
|
||||||
|
import org.web3j.tx.gas.DefaultGasProvider;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.Collections;
|
||||||
|
import org.apache.flink.streaming.api.functions.async.ResultFuture;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class PoolElaborator extends RichAsyncFunction<SwapEventLog, ElaboratedSwapEvent> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PoolElaborator.class);
|
||||||
|
private transient Web3j web3j;
|
||||||
|
private transient Credentials credentials;
|
||||||
|
private transient DefaultGasProvider gasProvider;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(OpenContext openContext) throws Exception {
|
||||||
|
super.open(openContext);
|
||||||
|
|
||||||
|
// Get RPC URL from job parameters
|
||||||
|
Map<String, String> params = getRuntimeContext().getGlobalJobParameters();
|
||||||
|
String rpcUrl = params.getOrDefault("rpc_url", "http://localhost:8545");
|
||||||
|
// TODO: Get from configuration if needed
|
||||||
|
|
||||||
|
// Initialize Web3j
|
||||||
|
this.web3j = Web3j.build(new HttpService(rpcUrl));
|
||||||
|
|
||||||
|
// Dummy credentials for read-only operations
|
||||||
|
this.credentials = Credentials.create("0x0000000000000000000000000000000000000000000000000000000000000001");
|
||||||
|
|
||||||
|
// Default gas provider
|
||||||
|
this.gasProvider = new DefaultGasProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncInvoke(SwapEventLog swap, ResultFuture<ElaboratedSwapEvent> resultFuture) throws Exception {
|
||||||
|
CompletableFuture.supplyAsync(() -> {
|
||||||
|
try {
|
||||||
|
// Load the pool contract
|
||||||
|
UniswapV3Pool pool = UniswapV3Pool.load(
|
||||||
|
swap.getAddress(), // Pool address from the event
|
||||||
|
web3j,
|
||||||
|
credentials,
|
||||||
|
gasProvider
|
||||||
|
);
|
||||||
|
|
||||||
|
// Get token addresses
|
||||||
|
String token0 = pool.token0().send();
|
||||||
|
String token1 = pool.token1().send();
|
||||||
|
|
||||||
|
// Create enriched event
|
||||||
|
return new ElaboratedSwapEvent(swap, token0, token1);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error fetching pool tokens", e);
|
||||||
|
// Return original without enrichment
|
||||||
|
return new ElaboratedSwapEvent(swap, null, null);
|
||||||
|
}
|
||||||
|
}).thenAccept(enriched -> {
|
||||||
|
resultFuture.complete(Collections.singletonList(enriched));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
if (web3j != null) {
|
||||||
|
web3j.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user