gmx metadata and backfill in finaldata

This commit is contained in:
tim
2025-06-16 20:04:28 -04:00
parent 88057607d5
commit eef803d3d6
28 changed files with 10234 additions and 25 deletions

View File

@@ -0,0 +1,32 @@
"""GMX
Revision ID: 87dcd5929323
Revises: e47d1bca4b3d
Create Date: 2025-06-16 16:48:11.177904
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import dexorder.database
import dexorder.database.column_types
# revision identifiers, used by Alembic.
revision: str = '87dcd5929323'
down_revision: Union[str, None] = 'e47d1bca4b3d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('token', sa.Column('gmx_synthetic', sa.Boolean(), server_default=sa.text('false'), nullable=False))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('token', 'gmx_synthetic')
# ### end Alembic commands ###

File diff suppressed because one or more lines are too long

View File

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -49,4 +49,4 @@ def save_addrmeta(address: str, meta: AddressMetadata):
log.warning(f'Address {address} had unknown metadata type {meta["type"]}')
address_metadata: BlockDict[str,AddressMetadata] = BlockDict('a', redis=True, db=True, finalize_cb=save_addrmeta)
address_metadata: BlockDict[str,dict] = BlockDict('a', redis=True, db=True, finalize_cb=save_addrmeta)

View File

@@ -13,6 +13,7 @@ from dexorder.blockstate.fork import current_fork
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.final_ohlc import FinalOHLCRepository
from dexorder.gmx import gmx_wire_runner_late, gmx_wire_runner_early
from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr
from dexorder.util.shutdown import fatal
@@ -56,8 +57,10 @@ async def main():
ohlcs = FinalOHLCRepository()
await blockchain.connect()
walker = BlockWalker(flush_callback, timedelta(seconds=config.walker_flush_interval))
gmx_wire_runner_early(walker, backfill=ohlcs)
walker.add_event_trigger(handle_backfill_uniswap_swaps,
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
gmx_wire_runner_late(walker)
await walker.run()

View File

@@ -142,7 +142,7 @@ class BlockSet(Generic[T], Iterable[T], BlockData[T]):
return self.contains(item)
def __iter__(self) -> Iterator[T]:
yield from (k for k,v in self.iter_items(self.series))
return self.iter_keys(self.series)
class BlockDict(Generic[K,V], BlockData[V]):
@@ -162,6 +162,9 @@ class BlockDict(Generic[K,V], BlockData[V]):
def __contains__(self, item: K) -> bool:
return self.contains(item)
def __iter__(self) -> Iterator[K]:
return self.iter_keys(self.series)
def items(self) -> Iterable[tuple[K,V]]:
return self.iter_items(self.series)

View File

@@ -1,7 +1,7 @@
import logging
from typing import TypedDict, Optional, NotRequired
from sqlalchemy import Index
from sqlalchemy import Index, text
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.column import Address, Blockchain, Uint8
@@ -11,12 +11,26 @@ log = logging.getLogger(__name__)
class TokenDict (TypedDict):
"""
Token metadata dictionary
Fields:
a: The address of the token.
n: The name of the token.
s: The symbol of the token.
d: Number of decimals.
l: Indicates if approved ("listed").
g: gmx synthetic flag
x: Optional extra data.
"""
a: str
n: str
s: str
d: int
w: Optional[bool] # approved ("w"hitelisted)
x: NotRequired[dict] # extra data
l: NotRequired[bool]
g: NotRequired[bool]
x: NotRequired[dict]
# OldTokenDict is the primary dict we use in-memory, with basic JSON-able types
@@ -29,6 +43,7 @@ class OldTokenDict (TypedDict):
symbol: str
decimals: int
approved: bool # whether this token is in the whitelist or not
gmx_synthetic: NotRequired[bool]
x: NotRequired[dict] # extra data
@@ -43,6 +58,7 @@ class Token (Base):
symbol: Mapped[str] = mapped_column(index=True)
decimals: Mapped[Uint8]
approved: Mapped[bool] = mapped_column(index=True)
gmx_synthetic: Mapped[bool] = mapped_column(default=False, server_default=text('false'))
__table_args__ = (
Index('ix_token_name', 'name', postgresql_using='gist'), # full text search on name
@@ -53,10 +69,13 @@ class Token (Base):
def load(token_dict: OldTokenDict) -> 'Token':
return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'],
name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'],
approved=token_dict['approved'])
approved=token_dict['approved'], gmx_synthetic=token_dict.get('gmx_synthetic', False))
def dump(self):
return OldTokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
token = OldTokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
name=self.name, symbol=self.symbol, decimals=self.decimals, approved=self.approved)
if self.gmx_synthetic:
token['gmx_synthetic'] = True
return token

View File

@@ -158,10 +158,18 @@ async def handle_uniswap_swap(swap: EventData):
return
pool, time, price = data
addr = pool['address']
await update_pool_price(addr, time, price, pool['decimals'])
# log.debug(f'pool {addr} {minutely(time)} {price}')
async def update_pool_price(addr, time, price, decimals):
"""
Price should be an adjusted price with decimals, not the raw price from the pool. The decimals are used to
convert the price back to blockchain format for the triggers.
"""
pool_prices[addr] = price
await ohlcs.update_all(addr, time, price)
await update_price_triggers(pool, price)
# log.debug(f'pool {addr} {minutely(time)} {price}')
update_price_triggers(addr, price, decimals)
async def handle_vault_created(created: EventData):

View File

@@ -242,6 +242,10 @@ class OHLCFileSeries:
self.dirty_files = set()
self.quote: Optional[tuple[datetime,dec]] = None
@property
def exists(self) -> bool:
return self.quote_file is not None or os.path.exists(self.quote_filename)
@property
def quote_filename(self):
@@ -276,6 +280,16 @@ class OHLCFileSeries:
self.dirty_files.add(file)
# noinspection PyShadowingBuiltins
def update_ohlc(self, period: timedelta, time: datetime, open: dec, high: dec, low: dec, close: dec):
file = OHLCFile.get(self.base_dir, OHLCFilePath(self.symbol, period, time))
file.update(time, open)
file.update(time, high)
file.update(time, low)
file.update(time, close)
self.dirty_files.add(file)
def _load(self, time):
#
# load quote file
@@ -359,14 +373,25 @@ class FinalOHLCRepository:
"""
def __init__(self):
assert config.ohlc_dir
self.dirty_series = set()
self.dirty_series: set[OHLCFileSeries] = set()
def update(self, symbol: str, time: datetime, price: Optional[dec]):
series = self.get_series(symbol)
series.update(time, price)
self.dirty_series.add(series)
# noinspection PyShadowingBuiltins
def update_ohlc(self, symbol: str, period: timedelta, time: datetime, open: dec, high: dec, low: dec, close: dec):
series = self.get_series(symbol)
series.update_ohlc(period, time, open, high, low, close)
self.dirty_series.add(series)
@staticmethod
def get_series(symbol):
chain_id = current_chain.get().id
base_dir = os.path.join(config.ohlc_dir, str(chain_id))
series = OHLCFileSeries.get(base_dir, symbol)
series.update(time, price)
self.dirty_series.add(series)
return series
def flush(self) -> None:
for series in self.dirty_series:
@@ -378,3 +403,6 @@ class FinalOHLCRepository:
closing.file.close()
# noinspection PyProtectedMember
OHLCFile._closing.clear()
def has_symbol(self, symbol: str):
return self.get_series(symbol).exists

View File

@@ -0,0 +1,5 @@
from ._base import GMXPool, gmx_prices, gmx_tokens, gmx_pools
from ._chaininfo import gmx_chain_info
from ._handle import gmx_wire_runner_early, gmx_wire_runner_late
from ._metadata import *

77
src/dexorder/gmx/_abi.py Normal file
View File

@@ -0,0 +1,77 @@
import itertools
import logging
import re
from datetime import datetime
from eth_utils import keccak
from dexorder import dec, from_timestamp
from dexorder.util import hexbytes, hexstr
from dexorder.util.abiencode import abi_decoder
log = logging.getLogger(__name__)
def no_ws(s):
return re.sub(r"\s+", "", s)
EventLogDataType = '''
(((string,address)[],(string,address[])[]),
((string,uint256)[],(string,uint256[])[]),
((string,int256)[], (string,int256[])[] ),
((string,bool)[], (string,bool[])[] ),
((string,bytes32)[],(string,bytes32[])[]),
((string,bytes)[], (string,bytes[])[] ),
((string,string)[], (string,string[])[] )
)'''
EventLogType = f'EventLog( address, string, string, {EventLogDataType} )'
EventLog1Type = f'EventLog1( address, string, string, bytes32, {EventLogDataType} )'
EventLog2Type = f'EventLog2( address, string, string, bytes32, bytes32, {EventLogDataType} )'
EventLogTopic = hexstr(keccak(text=no_ws(EventLogType)))
EventLog1Topic = hexstr(keccak(text=no_ws(EventLog1Type)).hex())
EventLog2Topic = hexstr(keccak(text=no_ws(EventLog2Type)).hex())
def topic_hash(signature):
return hexstr(keccak(text=no_ws(signature)))
def parse_event_log_data(event_log_data):
"""Parse GMX event log data into a structured dictionary.
Args:
event_log_data: Raw event log data tuple containing address items, uint items,
int items, bool items, bytes32 items, bytes items and string items
Returns:
dict: Parsed event data with typed values
"""
if type(event_log_data) is str:
event_log_data = hexbytes(event_log_data)
sender, event_name, event_log_data = abi_decoder.decode(('address', 'string', no_ws(EventLogDataType),), event_log_data)
result = {'sender': sender, 'event': event_name}
for items, array_items in event_log_data:
for k, v in items:
result[k] = v
for k, v in array_items:
result[k] = v
return result
class OracleEvent:
def __init__(self, event_log):
event_log_data = event_log['...']
data = parse_event_log_data(event_log_data)
self.token: str = data['token']
self.provider: str = data['provider']
self.min_price: dec = dec(data['minPrice'])
self.max_price: dec = dec(data['maxPrice'])
self.time: datetime = from_timestamp(data['timestamp'])
if __name__ == '__main__':
print(EventLogTopic)
print(EventLog1Topic)
print(EventLog2Topic)

69
src/dexorder/gmx/_base.py Normal file
View File

@@ -0,0 +1,69 @@
import logging
from dataclasses import dataclass
import requests
from ._chaininfo import GMX_API_BASE_URLS
from .. import dec
from ..addrmeta import address_metadata
from ..base.chain import current_chain
from ..blockstate import BlockDict
from ..database.model.token import OldTokenDict
from ..util import json
log = logging.getLogger(__name__)
@dataclass
class GMXPool:
chain_id: int
address: str
index_token: str
long_token: str
short_token: str
min_collateral: dec
disabled: bool = False
@property
def max_leverage(self):
return round(1/self.min_collateral)
@property
def is_enabled(self):
return not self.disabled
def __bool__(self):
return self.is_enabled
def __str__(self):
# noinspection PyTypedDict
name = address_metadata[self.index_token]['symbol'] if self.index_token in address_metadata else self.index_token
# return f'GMX:{name}'
def t(addr):
# noinspection PyTypedDict
return address_metadata[addr]['symbol'] if addr in address_metadata and address_metadata[addr] else addr
return f'GMX:{self.address} ({t(self.index_token)}) {t(self.long_token)}-{t(self.short_token)} {self.max_leverage}x'
def __repr__(self):
return str(self)
@staticmethod
def load(s: str):
d = json.loads(s)
return GMXPool(d['chain_id'], d['address'], d['index_token'], d['long_token'], d['short_token'], dec(d['min_collateral']))
GMX_API_BASE_URL = None
def gmx_api(method, **params):
global GMX_API_BASE_URL
if GMX_API_BASE_URL is None:
GMX_API_BASE_URL = GMX_API_BASE_URLS[current_chain.get().id]
return requests.get(GMX_API_BASE_URL+method, params=params).json()
gmx_tokens: BlockDict[str, OldTokenDict] = BlockDict('gmx_t', redis=True)
gmx_pools: BlockDict[str, GMXPool] = BlockDict('gmx_m', redis=True, str2value=GMXPool.load)
gmx_prices: BlockDict[str, dec] = BlockDict('gmx_p', redis=True, str2value=dec)

View File

@@ -0,0 +1,15 @@
import logging
log = logging.getLogger(__name__)
gmx_chain_info = {
42161: {
'EventEmitter': '0xC8ee91A54287DB53897056e12D9819156D3822Fb',
'DataStore': '0xFD70de6b91282D8017aA4E741e9Ae325CAb992d8',
'Reader': '0x0537C767cDAC0726c76Bb89e92904fe28fd02fE1',
}
}
GMX_API_BASE_URLS={
42161: 'https://arbitrum-api.gmxinfra.io/'
}

View File

@@ -0,0 +1,15 @@
import logging
from functools import cache
from dexorder.contract import ContractProxy
from dexorder.util import json
log = logging.getLogger(__name__)
@cache
def get_gmx_contract(name: str):
with open(f'./resource/abi/42161/gmx/{name}.json') as file:
info = json.load(file)
return ContractProxy(info['address'], abi=info['abi'])

View File

@@ -0,0 +1,18 @@
import logging
from eth_utils import keccak
from dexorder.util.abiencode import abi_encoder
log = logging.getLogger(__name__)
def combo_key(key_str, arg, arg_type='address'):
key_bytes = keccak(abi_encoder.encode(['string'], [key_str]))
return keccak(abi_encoder.encode(['bytes32', arg_type], [key_bytes, arg]))
IS_MARKET_DISABLED_KEY = 'IS_MARKET_DISABLED'
MIN_COLLATERAL_FACTOR_KEY = 'MIN_COLLATERAL_FACTOR'
async def is_market_disabled(ds, market):
return await ds.getBool(combo_key(IS_MARKET_DISABLED_KEY, market.address))

227
src/dexorder/gmx/_handle.py Normal file
View File

@@ -0,0 +1,227 @@
import asyncio
import logging
from datetime import timedelta
from ._abi import parse_event_log_data
from ._base import gmx_api
from ._chaininfo import gmx_chain_info
from ._metadata import gmx_update_metadata, gmx_token_symbol_map
from .. import dec, from_timestamp
from ..event_handler import update_pool_price
from ..final_ohlc import FinalOHLCRepository
from ..ohlc import period_name
from ..periodic import periodic
from ..progressor import BlockProgressor
from ..tokens import get_token
log = logging.getLogger(__name__)
def gmx_wire_runner_early(runner: BlockProgressor, backfill: FinalOHLCRepository=None):
runner.add_event_trigger(gmx_update_metadata)
runner.add_event_trigger(create_backfill_handler(backfill) if backfill else gmx_handle_price_update)
runner.add_event_trigger(handle_gmx_events, log_filter={'address':gmx_chain_info[42161]['EventEmitter'], })
def gmx_wire_runner_late(runner: BlockProgressor):
pass
def handle_marketpoolvalueupdated_event(event: dict):
# log.info(f'marketpoolvalueupdated: {event}')
# {
# 'sender': '0x3f6df0c3a7221ba1375e87e7097885a601b41afc',
# 'event': 'MarketPoolValueUpdated',
# 'market': '0x3680d7bfe9260d3c5de81aeb2194c119a59a99d1',
# 'longTokenAmount': 19476307269091870091,
# 'shortTokenAmount': 50700920349,
# 'longTokenUsd': 53158489854101648408924993481922790,
# 'shortTokenUsd': 50693714261673382360507024950000000,
# 'totalBorrowingFees': 41601639378800206440170070581268,
# 'borrowingFeePoolFactor': 630000000000000000000000000000,
# 'impactPoolAmount': 3152510497,
# 'marketTokensSupply': 93026785041268146396614,
# 'poolValue': 102896287811364604212637168767403111,
# 'longPnl': 142313435006510688238425736783147,
# 'shortPnl': -19431172516799270439150252797270,
# 'netPnl': 122882262489711417799275483985877,
# 'actionType': b'`y\x91\xfcYc\xe2d\xf1\xa9O\xaa\x12lcH/\xdcZ\xf1Jeo\x08u\x1d\xc8\xb0\xc5\xd4v0',
# 'tradeKey': b'Z]Qws\xaf\x115\x89\x85\xdd\xce)\x93t\xc4C\xccx{\x85N\xa0\x17B\x99r#\xd9~\x94\xd1'
# }
pass
event_handlers = {
'OraclePriceUpdate': None,
'MarketPoolValueInfo': None,
'MarketPoolValueUpdated': handle_marketpoolvalueupdated_event,
'OrderCreated': None,
'OrderUpdated': None,
'OrderCancelled': None,
'OrderExecuted': None,
'OrderCollateralDeltaAmountAutoUpdated': None,
'PositionIncrease': None,
'PositionDecrease': None,
'PositionFeesCollected': None,
'OpenInterestInTokensUpdated': None,
'OpenInterestUpdated': None,
'CollateralSumUpdated': None,
'ClaimableFeeAmountUpdated': None,
'ClaimableFundingUpdated': None,
'ClaimableFundingAmountPerSizeUpdated': None,
'FundingFeesClaimed': None,
'PoolAmountUpdated': None,
'VirtualSwapInventoryUpdated': None,
'SwapInfo': None,
'SwapFeesCollected': None,
'SwapImpactPoolAmountUpdated': None,
'PositionImpactPoolAmountUpdated': None,
'VirtualPositionInventoryUpdated': None,
'CumulativeBorrowingFactorUpdated': None,
'KeeperExecutionFee': None,
'ExecutionFeeRefund': None,
'FundingFeeAmountPerSizeUpdated': None,
'SetUint': None,
# SetBytes32 presumably and others...
'SyncConfig': None,
'DepositCreated': None,
'DepositExecuted': None,
'WithdrawalCreated': None,
'WithdrawalExecuted': None,
'ShiftCreated': None,
'ShiftExecuted': None,
'GlvValueUpdated': None,
'GlvDepositCreated': None,
'GlvDepositExecuted': None,
'GlvWithdrawalCreated': None,
'GlvWithdrawalExecuted': None,
'GlvShiftCreated': None,
'GlvShiftExecuted': None,
'AffiliateRewardUpdated': None,
'IncrementSubaccountActionCount': None,
}
async def handle_gmx_events(events: list[dict]):
for event in events:
data = parse_event_log_data(event['data'])
event_name = data['event']
try:
event_handlers[event_name](data)
except KeyError:
log.debug(f'Unknown event {event_name}')
except TypeError:
pass
initialized = False
@periodic(timedelta(hours=1))
async def gmx_handle_metadata_update():
global initialized
# noinspection PyBroadException
try:
await gmx_update_metadata()
initialized = True
except:
if not initialized:
raise
log.exception('Exception in gmx_handle_metadata_update()')
@periodic(timedelta(seconds=1))
async def gmx_handle_price_update():
updates = await fetch_price_updates()
# ticker updates have only one price per addr so we can parallelize setting prices
await asyncio.gather(*[update_pool_price(addr, time, price, 30) for addr, time, price in updates])
def create_backfill_handler(ohlcs: FinalOHLCRepository):
@periodic(timedelta(seconds=1))
async def gmx_handle_price_update_with_backfill():
updates = await fetch_price_updates()
backfill_addrs = [addr for addr, time, price in updates if not ohlcs.has_symbol(addr)]
backfill_addrs = [backfill_addrs[0]] # todo remove debug
if backfill_addrs:
log.info(f'Backfilling {len(backfill_addrs)} new GMX tokens')
await asyncio.gather(*[backfill_token(ohlcs, a) for a in backfill_addrs])
for addr, time, price in updates:
ohlcs.update(addr, time, price)
return gmx_handle_price_update_with_backfill
def push_candle(ohlcs, addr, period, candle):
time, *prices = candle
time = from_timestamp(time)
prices = [dec(p) for p in prices]
ohlcs.update_ohlc(addr, period, time, *prices)
GMX_OHLC_PERIODS = [
timedelta(minutes=1),
timedelta(minutes=5),
timedelta(minutes=15),
timedelta(hours=1),
timedelta(hours=4),
timedelta(days=1),
]
async def backfill_token(ohlcs: FinalOHLCRepository, addr: str):
token = await get_token(addr)
addr = token['address']
for period in GMX_OHLC_PERIODS:
# Polling a large window is the only history method GMX provides :( It's also how their web client works!
symbol = gmx_token_symbol_map[addr]
interval = period_name(period).lower()
response = gmx_api('prices/candles', tokenSymbol=symbol, period=interval, limit=10_000)
if 'error' in response:
if not response['error'].startswith('unsupported period'):
log.warning(f'Could not query token backfill for {token["symbol"]}: {response["error"]}')
else:
for c in reversed(response['candles']):
push_candle(ohlcs, addr, period, c)
log.info(f'Backfilled new GMX token {token["symbol"]}')
async def fetch_price_updates():
updates = []
for t in gmx_api('prices/tickers'):
"""
{
"tokenAddress": "0x3Eea56A1ccCdbfB70A26aD381C71Ee17E4c8A15F",
"tokenSymbol": "BOME",
"minPrice": "1621019778803375000000",
"maxPrice": "1621534421901125000000",
"updatedAt": 1749849326251,
"timestamp": 1749849325
},
"""
addr = t['tokenAddress']
# GMX prices use 30 decimal places
price = (dec(t['minPrice']) + dec(t['maxPrice'])) / 2 * dec(10) ** dec(-30)
time = from_timestamp(t['timestamp'])
updates.append((addr, time, price))
return updates

View File

@@ -0,0 +1,94 @@
__all__ = ['gmx_update_metadata', 'gmx_token_symbol_map']
import asyncio
import logging
import re
from copy import copy
from pprint import pprint
from dexorder import dec, ADDRESS_0
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.database.model.token import OldTokenDict
from dexorder.gmx._base import GMXPool, gmx_tokens, gmx_pools, gmx_api
from dexorder.gmx._contract import get_gmx_contract
from dexorder.gmx._datastore import combo_key, MIN_COLLATERAL_FACTOR_KEY, is_market_disabled
from dexorder.tokens import get_token
log = logging.getLogger(__name__)
async def gmx_update_metadata():
await gmx_detect_tokens()
await gmx_detect_pools()
async def gmx_detect_tokens():
chain_id = current_chain.get().id
response = gmx_api('tokens')
if 'tokens' not in response:
raise ValueError('No tokens in GMX response')
tokens = []
for info in response['tokens']:
synthetic = info.get('synthetic',False)
name = f'GMX {info["symbol"]}'
if synthetic:
name += ' Synthetic'
token = OldTokenDict(type='Token', chain=chain_id, address=info['address'], name=name,
symbol=info['symbol'], decimals=info['decimals'], approved=True, gmx_synthetic=synthetic)
if re.search(r'deprecated', info['symbol'], re.IGNORECASE):
continue
gmx_token_symbol_map[token['address']] = token['symbol']
if synthetic and token['address'] not in address_metadata:
# noinspection PyTypeChecker
address_metadata[info['address']] = token
gmx_tokens[token['address']] = token
tokens.append(token)
# Delete any tokens that are no longer listed
valid_addrs = set(t['address'] for t in tokens)
invalid_addrs = set(md['address'] for md in gmx_tokens.values() if md['address'] not in valid_addrs)
for addr in invalid_addrs:
log.info(f'Deleting invalid GMX token {gmx_tokens[addr]}')
del gmx_tokens[addr]
async def gmx_detect_pools():
ds = get_gmx_contract('DataStore')
reader = get_gmx_contract('Reader')
market_info = await reader.getMarkets(ds.address, 0, 1000)
markets = [
GMXPool(42161, market_token, index_token, long_token, short_token, dec('nan'))
for market_token, long_token, short_token, index_token in market_info
]
# some pools have ADDRESS_0 as the long token. wat?
markets = [m for m in markets if m.long_token != ADDRESS_0 and m.short_token != ADDRESS_0 and m.index_token != ADDRESS_0 and m.address != ADDRESS_0]
market_disabled = await asyncio.gather(*[is_market_disabled(ds, m) for m in markets])
markets = [m for m,d in zip(markets, market_disabled) if not d]
valid_addrs = set(m.address for m in markets)
invalid_addrs = set(p.address for p in gmx_pools.values() if p.address not in valid_addrs)
new_markets = [m for m in markets if m.address not in gmx_pools]
async def init_market(m):
result = await ds.getUint(combo_key(MIN_COLLATERAL_FACTOR_KEY, m.address))
if result == 0:
# raise ValueError(f'no min collateral factor for market {m.address}')
log.warning(f'no min collateral factor for market {m.address}')
m.min_collateral = 2 * dec(result) / dec(1e30)
gmx_pools[m.address] = m
await asyncio.gather(*[init_market(m) for m in new_markets])
token_addrs = set(t for m in new_markets for t in (m.address, m.index_token, m.long_token, m.short_token))
await asyncio.gather(*[get_token(t) for t in token_addrs])
# Disable any markets that are not longer valid
for addr in invalid_addrs:
log.info(f'Disabling GMX market {gmx_pools[addr]}')
updated = copy(gmx_pools[addr])
updated.disabled = True
gmx_pools[addr] = updated
pprint(new_markets)
return markets
gmx_token_symbol_map: dict[str, str] = {} # maps addresses to GMX token symbols

View File

@@ -164,7 +164,7 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
returns an ordered list of OHLC's that have been created/modified by the new time/price
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
cur = prev
if time < cur.start:
# data corruption. just shut down
@@ -177,13 +177,13 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
break
result.append(cur)
cur = NativeOHLC(end, None, None, None, cur.close)
# log.debug(f'\ttime advancements: {result}')
log.debug(f'\ttime advancements: {result}')
# if we are setting a price, update the current bar
if price is not None:
cur.update(price)
result.append(cur)
# log.debug(f'\tappended current bar: {cur.ohlc}')
# log.debug(f'\tupdate result: {result}')
log.debug(f'\tappended current bar: {cur.ohlc}')
log.debug(f'\tupdate result: {result}')
return result
class OHLCKey (NamedTuple):
@@ -342,11 +342,17 @@ class OHLCRepository:
def add_symbol(symbol: str, period: timedelta = None):
if period is not None:
if (symbol, period) not in recent_ohlcs:
recent_ohlcs[(symbol, period)] = [] # setting an empty value will initiate price capture
recent_ohlcs[OHLCKey(symbol, period)] = [] # setting an empty value will initiate price capture
else:
for period in OHLC_PERIODS:
if (symbol, period) not in recent_ohlcs:
recent_ohlcs[(symbol, period)] = []
recent_ohlcs[OHLCKey(symbol, period)] = []
@staticmethod
def has_symbol(symbol: str, period: timedelta):
return OHLCKey(symbol, period) in recent_ohlcs
async def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True):
""" the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """
@@ -364,7 +370,7 @@ class OHLCRepository:
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None:
self.quotes[symbol] = timestamp(time), str(price)
key = symbol, period
key = OHLCKey(symbol, period)
# recent_ohlcs holds a list of "recent" NativeOHLC's stored as blockdata. we try to keep the recent array long
# enough to extend prior the root block time
historical: Optional[list[NativeOHLC]] = recent_ohlcs.get(key)

View File

@@ -111,10 +111,10 @@ async def update_balance_triggers(vault: str, token: str):
await asyncio.gather(*updates)
async def update_price_triggers(pool: OldPoolDict, price: dec):
price = price * dec(10) ** dec(-pool['decimals']) # adjust for pool decimals to get onchain price
def update_price_triggers(addr: str, price: dec, decimals: int):
price = price * dec(10) ** dec(-decimals) # adjust for pool decimals to get onchain price
price = float(price) # since we use SIMD operations to evaluate lines, we must convert to float
for pt in PriceLineTrigger.by_pool.get(pool['address'], []):
for pt in PriceLineTrigger.by_pool.get(addr, []):
pt.update(price)

36
src/dexorder/periodic.py Normal file
View File

@@ -0,0 +1,36 @@
from datetime import timedelta
import time
import asyncio
from functools import wraps
def periodic(period: timedelta|float):
"""
Decorator to allow only one execution of a function or coroutine per period.
Works for both sync and async functions.
"""
def decorator(func):
last_called = {'time': 0.}
period_seconds = period.total_seconds() if isinstance(period, timedelta) else period
@wraps(func)
def sync_wrapper(*args, **kwargs):
now = time.monotonic()
if now - last_called['time'] >= period_seconds:
last_called['time'] = now
return func(*args, **kwargs)
return None
@wraps(func)
async def async_wrapper(*args, **kwargs):
now = time.monotonic()
if now - last_called['time'] >= period_seconds:
last_called['time'] = now
return await func(*args, **kwargs)
return None
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator

View File

@@ -40,8 +40,8 @@ class BlockProgressor(metaclass=ABCMeta):
def add_event_trigger(self,
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
callback: Union[
Callable[[EventData], Maywaitable[None]],
Callable[[list[EventData]], Maywaitable[None]],
Callable[[EventData|dict], Maywaitable[None]],
Callable[[list[EventData|dict]], Maywaitable[None]],
Callable[[], Maywaitable[None]],
],
event: ContractEvents = None,

View File

@@ -316,7 +316,7 @@ class BlockStateRunner(BlockProgressor):
else:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
has_logs = 'topics' not in lf or any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
# log.debug(f'has {event.__class__.__name__}? {has_logs}')
if not has_logs:
get_logs = None

View File

@@ -52,6 +52,7 @@ async def get_token(address) -> Optional[OldTokenDict]:
# noinspection PyTypeChecker
return address_metadata[address]
except KeyError:
# noinspection PyTypeChecker
result = address_metadata[address] = await load_token(address)
return result