Add token metadata logging to swap event stream

- Integrated TokenElaborator to enrich swap events with token metadata
- Added comprehensive logging showing pool, token addresses, and metadata
- Increased timeout to handle rate limiting from RPC provider
- Simplified log output removing emoji formatting
This commit is contained in:
sjhavar
2025-10-06 16:03:08 -04:00
committed by surbhi
parent e58fdc0b06
commit 00ae425700
4 changed files with 51 additions and 76 deletions

4
.mvn/jvm.config Normal file
View File

@@ -0,0 +1,4 @@
--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED

8
eval "$(ssh-agent -s)" Normal file
View File

@@ -0,0 +1,8 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABBr7BKqJe
BOIqEijWReA7+HAAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY
416Fg7f3KjRUaaQ5x44CAWLsZtz4AAAAoCvYBgMQ51N0rrqc1FGsatMFbY+DQxyu4KDSJM
dH0Gr4m/B+3jyxHgQ+RmwRpoGk80Ew4zN6RIrsVZf8chfCybfyxFd/WuxvBIf4f3SVsu8z
k1WZ50YQvG5eATRzrRNDlcDxyyyLeSjn7Au2wEYYVPgzqvO5MzH+/B5PWItaVdb7fGUQ1W
xPRd+ZdJgZ0pIEPmjU440qG06hgYFTS0ywNv0=
-----END OPENSSH PRIVATE KEY-----

View File

@@ -0,0 +1 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIM9dTaag/jewNYcY416Fg7f3KjRUaaQ5x44CAWLsZtz4 surbhi.jhavar@gmail.com

View File

@@ -19,20 +19,19 @@
package stream; package stream;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.ParameterTool; import org.apache.flink.util.ParameterTool;
import stream.contract.ArbitrumOne; import org.apache.flink.util.Collector;
import stream.dto.*; import stream.dto.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import stream.io.PoolElaborator; import stream.io.PoolElaborator;
import stream.io.TokenElaborator; import stream.io.TokenElaborator;
import stream.source.eventlog.EventLogSourceFactory; import stream.source.eventlog.EventLogSourceFactory;
import stream.source.newheads.NewHeadsSourceFactory; import stream.source.newheads.NewHeadsSourceFactory;
import stream.dto.AddressId;
import stream.dto.Token;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@@ -75,13 +74,6 @@ public class DataStreamJob {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enable(SerializationFeature.INDENT_OUTPUT);
// Test token metadata markup
// https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/datastream/operators/asyncio/
DataStream<AddressId> usdcStream = env.fromData(new AddressId(42161, ArbitrumOne.ADDR_USDC));
SingleOutputStreamOperator<Token> elaboratedUsdcStream =
AsyncDataStream.unorderedWait(usdcStream, new TokenElaborator(), 10, TimeUnit.MINUTES);
// debug print
elaboratedUsdcStream.map(token -> {log.info("Token: {}",token); return token.toString();}).print();
DataStream<ArbitrumOneBlock> arbitrumHeads = env DataStream<ArbitrumOneBlock> arbitrumHeads = env
.fromSource( .fromSource(
@@ -103,83 +95,53 @@ public class DataStreamJob {
TimeUnit.SECONDS TimeUnit.SECONDS
); );
// Print the elaborated swap events with token addresses // Extract token addresses and elaborate with metadata
DataStream<AddressId> tokenAddresses = elaboratedSwapStream
.flatMap(new FlatMapFunction<ElaboratedSwapEvent, AddressId>() {
@Override
public void flatMap(ElaboratedSwapEvent event, Collector<AddressId> collector) throws Exception {
collector.collect(new AddressId(42161, event.getToken0Address()));
collector.collect(new AddressId(42161, event.getToken1Address()));
}
});
// Elaborate tokens with metadata
SingleOutputStreamOperator<Token> elaboratedTokens = AsyncDataStream.unorderedWait(
tokenAddresses,
new TokenElaborator(),
120,
TimeUnit.SECONDS
);
// Print comprehensive swap event data with token metadata
elaboratedSwapStream elaboratedSwapStream
.map(event -> { .map(event -> {
try { try {
String json = mapper.writeValueAsString(event); String json = mapper.writeValueAsString(event);
log.info("Elaborated Swap Event - Token0: {}, Token1: {}", log.info("SWAP EVENT - Pool: {} Block: {} TxHash: {} Token0: {} Token1: {} Amount0: {} Amount1: {} Tick: {}",
event.getToken0Address(), event.getToken1Address()); event.getSwapEvent().address,
event.getSwapEvent().blockNumber,
event.getSwapEvent().transactionHash,
event.getToken0Address(),
event.getToken1Address(),
event.getSwapEvent().amount0,
event.getSwapEvent().amount1,
event.getSwapEvent().tick);
return json; return json;
} catch (Exception e) { } catch (Exception e) {
return "Error converting elaborated swap event to JSON: " + e.getMessage(); return "Error converting elaborated swap event to JSON: " + e.getMessage();
} }
}) })
.print("Elaborated Swap Event: ");
// Map the blocks to pretty-printed JSON strings
/*
blockStream
.map(block -> {
try {
return mapper.writeValueAsString(block);
} catch (Exception e) {
return "Error converting block to JSON: " + e.getMessage();
}
})
.print("New Ethereum Block: ");
transferStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting transfer event to JSON: " + e.getMessage();
}
})
.print("Transfer Event: ");
swapStream
.map(event -> {
try {
return mapper.writeValueAsString(event);
} catch (Exception e) {
return "Error converting swap event to JSON: " + e.getMessage();
}
})
.print("Swap Event: "); .print("Swap Event: ");
*/
// mintStream // Print token metadata when available
// .map(event -> { elaboratedTokens
// try { .map(token -> {
// return mapper.writeValueAsString(event); log.info("TOKEN METADATA - Address: {} Name: {} Symbol: {} Decimals: {}",
// } catch (Exception e) { token.address, token.name, token.symbol, token.decimals);
// return "Error converting mint event to JSON: " + e.getMessage(); return token.toString();
// } })
// }) .print("Token: ");
// .print("Mint Event: ");
// burnStream
// .map(event -> {
// try {
// return mapper.writeValueAsString(event);
// } catch (Exception e) {
// return "Error converting burn event to JSON: " + e.getMessage();
// }
// })
// .print("Burn Event: ");
// swapStream
// .map(event -> {
// try {
// return mapper.writeValueAsString(event);
// } catch (Exception e) {
// return "Error converting swap event to JSON: " + e.getMessage();
// }
// })
// .print("Swap Event: ");
env.execute("Ethereum Block Stream"); env.execute("Ethereum Block Stream");
} }