import logging import sys from time import sleep import pandas as pd from web3 import Web3 log = logging.getLogger(__name__) def process_transaction(tx, w3): if ( # tx['to'] and # tx['to'].lower() == '0xA51afAFe0263b40EdaEf0Df8781eA9aa03E381a3'.lower() and tx['input'].startswith(bytes.fromhex('3593564c')) and (receipt := w3.eth.get_transaction_receipt(tx.hash))['status'] == 1 ): return receipt['gasUsed'] return None if __name__ == '__main__': if len(sys.argv) != 2: print('Usage: python uniswap4_gas.py ') rpc_url = sys.argv[1] logging.basicConfig(level=logging.INFO) log.setLevel(logging.DEBUG) num_blocks = 10000 w3 = Web3(Web3.HTTPProvider(rpc_url, request_kwargs={'timeout':15})) gas_data = pd.DataFrame(columns=['gas']) end_block = w3.eth.block_number start_block = end_block - num_blocks try: # Process events in pages of 2000 blocks PAGE_SIZE = 2000 events = [] for block in range(start_block, end_block + 1, PAGE_SIZE): current_end = min(block + PAGE_SIZE - 1, end_block) log.info(f'Querying events for blocks {block} to {current_end}...') event_filter = w3.eth.filter({ 'fromBlock': block, 'toBlock': current_end, # 'address': '0x011f31D20C8778c8Beb1093b73E3A5690Ee6271b', 'topics': ['0x40e9cecb9f5f1f1c5b9c97dec2917b7ee92e57ba5563708daca94dd84ad7112f'], }) page_events = event_filter.get_all_entries() events.extend(page_events) log.info(f'Found {len(page_events)} events in this page') sleep(0.5) # Add small delay between requests for event in events: try: log.info(f"Processing transaction from block {event['blockNumber']}") tx = w3.eth.get_transaction(event['transactionHash']) gas_used = process_transaction(tx, w3) if gas_used: new_data = pd.DataFrame({'gas': [gas_used]}) gas_data = pd.concat([gas_data, new_data], ignore_index=True) log.info(f"Transaction {event['transactionHash'].hex()}: Gas used {gas_used}") except Exception as e: log.error(f"Error processing transaction: {str(e)}") finally: sleep(0.2) finally: gas_data.to_csv('gas_data.csv', index=False)