From 00ae425700b9ac62ead7aa844d1feb729151c62d Mon Sep 17 00:00:00 2001 From: sjhavar Date: Mon, 6 Oct 2025 16:03:08 -0400 Subject: [PATCH] Add token metadata logging to swap event stream - Integrated TokenElaborator to enrich swap events with token metadata - Added comprehensive logging showing pool, token addresses, and metadata - Increased timeout to handle rate limiting from RPC provider - Simplified log output removing emoji formatting --- .mvn/jvm.config | 4 + "eval \"$(ssh-agent -s)\"" | 8 ++ "eval \"$(ssh-agent -s)\".pub" | 1 + src/main/java/stream/DataStreamJob.java | 114 ++++++++---------------- 4 files changed, 51 insertions(+), 76 deletions(-) create mode 100644 .mvn/jvm.config create mode 100644 "eval \"$(ssh-agent -s)\"" create mode 100644 "eval \"$(ssh-agent -s)\".pub" diff --git a/.mvn/jvm.config b/.mvn/jvm.config new file mode 100644 index 0000000..8a5996e --- /dev/null +++ b/.mvn/jvm.config @@ -0,0 +1,4 @@ +--add-opens java.base/java.lang=ALL-UNNAMED +--add-opens java.base/java.lang.reflect=ALL-UNNAMED +--add-opens java.base/java.net=ALL-UNNAMED +--add-opens java.base/java.util=ALL-UNNAMED \ No newline at end of file diff --git "a/eval \"$(ssh-agent -s)\"" "b/eval \"$(ssh-agent -s)\"" new file mode 100644 index 0000000..3857e1c --- /dev/null +++ "b/eval \"$(ssh-agent -s)\"" @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABBr7BKqJe +BOIqEijWReA7+HAAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY +416Fg7f3KjRUaaQ5x44CAWLsZtz4AAAAoCvYBgMQ51N0rrqc1FGsatMFbY+DQxyu4KDSJM +dH0Gr4m/B+3jyxHgQ+RmwRpoGk80Ew4zN6RIrsVZf8chfCybfyxFd/WuxvBIf4f3SVsu8z +k1WZ50YQvG5eATRzrRNDlcDxyyyLeSjn7Au2wEYYVPgzqvO5MzH+/B5PWItaVdb7fGUQ1W +xPRd+ZdJgZ0pIEPmjU440qG06hgYFTS0ywNv0= +-----END OPENSSH PRIVATE KEY----- diff --git "a/eval \"$(ssh-agent -s)\".pub" "b/eval \"$(ssh-agent -s)\".pub" new file mode 100644 index 0000000..edbbb8e --- /dev/null +++ "b/eval \"$(ssh-agent -s)\".pub" @@ -0,0 +1 @@ +ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY416Fg7f3KjRUaaQ5x44CAWLsZtz4 surbhi.jhavar@gmail.com diff --git a/src/main/java/stream/DataStreamJob.java b/src/main/java/stream/DataStreamJob.java index 1182307..eb690c5 100644 --- a/src/main/java/stream/DataStreamJob.java +++ b/src/main/java/stream/DataStreamJob.java @@ -19,20 +19,19 @@ package stream; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.ParameterTool; -import stream.contract.ArbitrumOne; +import org.apache.flink.util.Collector; import stream.dto.*; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import stream.io.PoolElaborator; import stream.io.TokenElaborator; import stream.source.eventlog.EventLogSourceFactory; import stream.source.newheads.NewHeadsSourceFactory; -import stream.dto.AddressId; -import stream.dto.Token; import java.io.IOException; import java.net.URI; @@ -75,13 +74,6 @@ public class DataStreamJob { ObjectMapper mapper = new ObjectMapper(); mapper.enable(SerializationFeature.INDENT_OUTPUT); - // Test token metadata markup - // https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/operators/asyncio/ - DataStream usdcStream = env.fromData(new AddressId(42161, ArbitrumOne.ADDR_USDC)); - SingleOutputStreamOperator elaboratedUsdcStream = - AsyncDataStream.unorderedWait(usdcStream, new TokenElaborator(), 10, TimeUnit.MINUTES); - // debug print - elaboratedUsdcStream.map(token -> {log.info("Token: {}",token); return token.toString();}).print(); DataStream arbitrumHeads = env .fromSource( @@ -103,83 +95,53 @@ public class DataStreamJob { TimeUnit.SECONDS ); - // Print the elaborated swap events with token addresses + // Extract token addresses and elaborate with metadata + DataStream tokenAddresses = elaboratedSwapStream + .flatMap(new FlatMapFunction() { + @Override + public void flatMap(ElaboratedSwapEvent event, Collector collector) throws Exception { + collector.collect(new AddressId(42161, event.getToken0Address())); + collector.collect(new AddressId(42161, event.getToken1Address())); + } + }); + + // Elaborate tokens with metadata + SingleOutputStreamOperator elaboratedTokens = AsyncDataStream.unorderedWait( + tokenAddresses, + new TokenElaborator(), + 120, + TimeUnit.SECONDS + ); + + // Print comprehensive swap event data with token metadata elaboratedSwapStream .map(event -> { try { String json = mapper.writeValueAsString(event); - log.info("Elaborated Swap Event - Token0: {}, Token1: {}", - event.getToken0Address(), event.getToken1Address()); + log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}", + event.getSwapEvent().address, + event.getSwapEvent().blockNumber, + event.getSwapEvent().transactionHash, + event.getToken0Address(), + event.getToken1Address(), + event.getSwapEvent().amount0, + event.getSwapEvent().amount1, + event.getSwapEvent().tick); return json; } catch (Exception e) { return "Error converting elaborated swap event to JSON: " + e.getMessage(); } }) - .print("Elaborated Swap Event: "); - - - // Map the blocks to pretty-printed JSON strings -/* - blockStream - .map(block -> { - try { - return mapper.writeValueAsString(block); - } catch (Exception e) { - return "Error converting block to JSON: " + e.getMessage(); - } - }) - .print("New Ethereum Block: "); - - transferStream - .map(event -> { - try { - return mapper.writeValueAsString(event); - } catch (Exception e) { - return "Error converting transfer event to JSON: " + e.getMessage(); - } - }) - .print("Transfer Event: "); - - swapStream - .map(event -> { - try { - return mapper.writeValueAsString(event); - } catch (Exception e) { - return "Error converting swap event to JSON: " + e.getMessage(); - } - }) .print("Swap Event: "); -*/ -// mintStream -// .map(event -> { -// try { -// return mapper.writeValueAsString(event); -// } catch (Exception e) { -// return "Error converting mint event to JSON: " + e.getMessage(); -// } -// }) -// .print("Mint Event: "); - -// burnStream -// .map(event -> { -// try { -// return mapper.writeValueAsString(event); -// } catch (Exception e) { -// return "Error converting burn event to JSON: " + e.getMessage(); -// } -// }) -// .print("Burn Event: "); - -// swapStream -// .map(event -> { -// try { -// return mapper.writeValueAsString(event); -// } catch (Exception e) { -// return "Error converting swap event to JSON: " + e.getMessage(); -// } -// }) -// .print("Swap Event: "); + // Print token metadata when available + elaboratedTokens + .map(token -> { + log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}", + token.address, token.name, token.symbol, token.decimals); + return token.toString(); + }) + .print("Token: "); env.execute("Ethereum Block Stream"); }