Compare commits
1 Commits
eef803d3d6
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4936150c3b |
@@ -1,32 +0,0 @@
|
||||
"""GMX
|
||||
|
||||
Revision ID: 87dcd5929323
|
||||
Revises: e47d1bca4b3d
|
||||
Create Date: 2025-06-16 16:48:11.177904
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
import dexorder.database
|
||||
import dexorder.database.column_types
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '87dcd5929323'
|
||||
down_revision: Union[str, None] = 'e47d1bca4b3d'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('token', sa.Column('gmx_synthetic', sa.Boolean(), server_default=sa.text('false'), nullable=False))
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column('token', 'gmx_synthetic')
|
||||
# ### end Alembic commands ###
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -49,4 +49,4 @@ def save_addrmeta(address: str, meta: AddressMetadata):
|
||||
log.warning(f'Address {address} had unknown metadata type {meta["type"]}')
|
||||
|
||||
|
||||
address_metadata: BlockDict[str,dict] = BlockDict('a', redis=True, db=True, finalize_cb=save_addrmeta)
|
||||
address_metadata: BlockDict[str,AddressMetadata] = BlockDict('a', redis=True, db=True, finalize_cb=save_addrmeta)
|
||||
|
||||
@@ -42,10 +42,12 @@ class Account (LocalAccount):
|
||||
# log.debug(f'available accounts: {Account._pool.qsize()}')
|
||||
try:
|
||||
async with asyncio.timeout(1):
|
||||
result = await Account._pool.get()
|
||||
result: "Account" = await Account._pool.get()
|
||||
except asyncio.TimeoutError:
|
||||
log.error('waiting for an available account')
|
||||
result = await Account._pool.get()
|
||||
# mark as out of pool
|
||||
result._in_pool = False
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
return result
|
||||
|
||||
@@ -59,17 +61,20 @@ class Account (LocalAccount):
|
||||
if Account._main_account is None:
|
||||
Account._main_account = account
|
||||
Account._pool.put_nowait(account)
|
||||
account._in_pool = True # this account is now in the pool
|
||||
Account._all.append(account)
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
metric.account_total.set(len(Account._all))
|
||||
log.info(f'Account pool {[a.address for a in Account._all]}')
|
||||
|
||||
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
||||
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
||||
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
||||
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
||||
self.chain_id = current_chain.get().id
|
||||
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
|
||||
self._nonce: Optional[int] = None
|
||||
self.tx_id: Optional[str] = None # current transaction id
|
||||
# release() idempotency tracking
|
||||
self._in_pool: bool = False
|
||||
|
||||
async def next_nonce(self):
|
||||
if self._nonce is None:
|
||||
@@ -86,8 +91,21 @@ class Account (LocalAccount):
|
||||
return current_w3.get().eth.get_balance(self.address)
|
||||
|
||||
def release(self):
|
||||
metric.account_available.set(Account._pool.qsize() + 1)
|
||||
"""
|
||||
Return this Account to the pool.
|
||||
|
||||
Idempotent: calling release() multiple times without a new acquire()
|
||||
will only enqueue the account once.
|
||||
"""
|
||||
# If we're already in the pool, do nothing.
|
||||
if self._in_pool:
|
||||
# Optional debug log; comment out if too noisy.
|
||||
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
|
||||
return
|
||||
|
||||
Account._pool.put_nowait(self)
|
||||
self._in_pool = True
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
|
||||
def __str__(self):
|
||||
return self.address
|
||||
|
||||
@@ -13,7 +13,6 @@ from dexorder.blockstate.fork import current_fork
|
||||
from dexorder.configuration import parse_args
|
||||
from dexorder.contract import get_contract_event
|
||||
from dexorder.final_ohlc import FinalOHLCRepository
|
||||
from dexorder.gmx import gmx_wire_runner_late, gmx_wire_runner_early
|
||||
from dexorder.pools import get_uniswap_data
|
||||
from dexorder.util import hexstr
|
||||
from dexorder.util.shutdown import fatal
|
||||
@@ -57,10 +56,8 @@ async def main():
|
||||
ohlcs = FinalOHLCRepository()
|
||||
await blockchain.connect()
|
||||
walker = BlockWalker(flush_callback, timedelta(seconds=config.walker_flush_interval))
|
||||
gmx_wire_runner_early(walker, backfill=ohlcs)
|
||||
walker.add_event_trigger(handle_backfill_uniswap_swaps,
|
||||
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
|
||||
gmx_wire_runner_late(walker)
|
||||
await walker.run()
|
||||
|
||||
|
||||
|
||||
@@ -142,7 +142,7 @@ class BlockSet(Generic[T], Iterable[T], BlockData[T]):
|
||||
return self.contains(item)
|
||||
|
||||
def __iter__(self) -> Iterator[T]:
|
||||
return self.iter_keys(self.series)
|
||||
yield from (k for k,v in self.iter_items(self.series))
|
||||
|
||||
|
||||
class BlockDict(Generic[K,V], BlockData[V]):
|
||||
@@ -162,9 +162,6 @@ class BlockDict(Generic[K,V], BlockData[V]):
|
||||
def __contains__(self, item: K) -> bool:
|
||||
return self.contains(item)
|
||||
|
||||
def __iter__(self) -> Iterator[K]:
|
||||
return self.iter_keys(self.series)
|
||||
|
||||
def items(self) -> Iterable[tuple[K,V]]:
|
||||
return self.iter_items(self.series)
|
||||
|
||||
|
||||
@@ -33,7 +33,8 @@ class ContractTransaction:
|
||||
async def wait(self) -> TxReceipt:
|
||||
if self.receipt is None:
|
||||
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
|
||||
self.account.release()
|
||||
if self.account is not None:
|
||||
self.account.release()
|
||||
return self.receipt
|
||||
|
||||
async def sign(self, account: Account):
|
||||
@@ -153,10 +154,14 @@ class ContractProxy:
|
||||
def __getattr__(self, item):
|
||||
if item == 'constructor':
|
||||
found = self.contract.constructor
|
||||
elif item in self.contract.functions:
|
||||
found = self.contract.functions[item]
|
||||
else:
|
||||
raise AttributeError(item)
|
||||
funcs = self.contract.functions
|
||||
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
|
||||
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
|
||||
# Additionally, guard against unexpected None to fail fast with a clear error.
|
||||
found = getattr(funcs, item, None)
|
||||
if not callable(found):
|
||||
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
|
||||
return self._wrapper(self.address, item, found)
|
||||
|
||||
def __repr__(self):
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
from typing import TypedDict, Optional, NotRequired
|
||||
|
||||
from sqlalchemy import Index, text
|
||||
from sqlalchemy import Index
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from dexorder.database.column import Address, Blockchain, Uint8
|
||||
@@ -11,26 +11,12 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TokenDict (TypedDict):
|
||||
"""
|
||||
Token metadata dictionary
|
||||
|
||||
Fields:
|
||||
a: The address of the token.
|
||||
n: The name of the token.
|
||||
s: The symbol of the token.
|
||||
d: Number of decimals.
|
||||
l: Indicates if approved ("listed").
|
||||
g: gmx synthetic flag
|
||||
x: Optional extra data.
|
||||
"""
|
||||
|
||||
a: str
|
||||
n: str
|
||||
s: str
|
||||
d: int
|
||||
l: NotRequired[bool]
|
||||
g: NotRequired[bool]
|
||||
x: NotRequired[dict]
|
||||
w: Optional[bool] # approved ("w"hitelisted)
|
||||
x: NotRequired[dict] # extra data
|
||||
|
||||
|
||||
# OldTokenDict is the primary dict we use in-memory, with basic JSON-able types
|
||||
@@ -43,7 +29,6 @@ class OldTokenDict (TypedDict):
|
||||
symbol: str
|
||||
decimals: int
|
||||
approved: bool # whether this token is in the whitelist or not
|
||||
gmx_synthetic: NotRequired[bool]
|
||||
x: NotRequired[dict] # extra data
|
||||
|
||||
|
||||
@@ -58,7 +43,6 @@ class Token (Base):
|
||||
symbol: Mapped[str] = mapped_column(index=True)
|
||||
decimals: Mapped[Uint8]
|
||||
approved: Mapped[bool] = mapped_column(index=True)
|
||||
gmx_synthetic: Mapped[bool] = mapped_column(default=False, server_default=text('false'))
|
||||
|
||||
__table_args__ = (
|
||||
Index('ix_token_name', 'name', postgresql_using='gist'), # full text search on name
|
||||
@@ -69,13 +53,10 @@ class Token (Base):
|
||||
def load(token_dict: OldTokenDict) -> 'Token':
|
||||
return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'],
|
||||
name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'],
|
||||
approved=token_dict['approved'], gmx_synthetic=token_dict.get('gmx_synthetic', False))
|
||||
approved=token_dict['approved'])
|
||||
|
||||
|
||||
def dump(self):
|
||||
token = OldTokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
|
||||
return OldTokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
|
||||
name=self.name, symbol=self.symbol, decimals=self.decimals, approved=self.approved)
|
||||
if self.gmx_synthetic:
|
||||
token['gmx_synthetic'] = True
|
||||
return token
|
||||
|
||||
|
||||
@@ -158,18 +158,10 @@ async def handle_uniswap_swap(swap: EventData):
|
||||
return
|
||||
pool, time, price = data
|
||||
addr = pool['address']
|
||||
await update_pool_price(addr, time, price, pool['decimals'])
|
||||
# log.debug(f'pool {addr} {minutely(time)} {price}')
|
||||
|
||||
|
||||
async def update_pool_price(addr, time, price, decimals):
|
||||
"""
|
||||
Price should be an adjusted price with decimals, not the raw price from the pool. The decimals are used to
|
||||
convert the price back to blockchain format for the triggers.
|
||||
"""
|
||||
pool_prices[addr] = price
|
||||
await ohlcs.update_all(addr, time, price)
|
||||
update_price_triggers(addr, price, decimals)
|
||||
await update_price_triggers(pool, price)
|
||||
# log.debug(f'pool {addr} {minutely(time)} {price}')
|
||||
|
||||
|
||||
async def handle_vault_created(created: EventData):
|
||||
|
||||
@@ -242,10 +242,6 @@ class OHLCFileSeries:
|
||||
self.dirty_files = set()
|
||||
self.quote: Optional[tuple[datetime,dec]] = None
|
||||
|
||||
@property
|
||||
def exists(self) -> bool:
|
||||
return self.quote_file is not None or os.path.exists(self.quote_filename)
|
||||
|
||||
|
||||
@property
|
||||
def quote_filename(self):
|
||||
@@ -280,16 +276,6 @@ class OHLCFileSeries:
|
||||
self.dirty_files.add(file)
|
||||
|
||||
|
||||
# noinspection PyShadowingBuiltins
|
||||
def update_ohlc(self, period: timedelta, time: datetime, open: dec, high: dec, low: dec, close: dec):
|
||||
file = OHLCFile.get(self.base_dir, OHLCFilePath(self.symbol, period, time))
|
||||
file.update(time, open)
|
||||
file.update(time, high)
|
||||
file.update(time, low)
|
||||
file.update(time, close)
|
||||
self.dirty_files.add(file)
|
||||
|
||||
|
||||
def _load(self, time):
|
||||
#
|
||||
# load quote file
|
||||
@@ -373,25 +359,14 @@ class FinalOHLCRepository:
|
||||
"""
|
||||
def __init__(self):
|
||||
assert config.ohlc_dir
|
||||
self.dirty_series: set[OHLCFileSeries] = set()
|
||||
self.dirty_series = set()
|
||||
|
||||
def update(self, symbol: str, time: datetime, price: Optional[dec]):
|
||||
series = self.get_series(symbol)
|
||||
series.update(time, price)
|
||||
self.dirty_series.add(series)
|
||||
|
||||
# noinspection PyShadowingBuiltins
|
||||
def update_ohlc(self, symbol: str, period: timedelta, time: datetime, open: dec, high: dec, low: dec, close: dec):
|
||||
series = self.get_series(symbol)
|
||||
series.update_ohlc(period, time, open, high, low, close)
|
||||
self.dirty_series.add(series)
|
||||
|
||||
@staticmethod
|
||||
def get_series(symbol):
|
||||
chain_id = current_chain.get().id
|
||||
base_dir = os.path.join(config.ohlc_dir, str(chain_id))
|
||||
series = OHLCFileSeries.get(base_dir, symbol)
|
||||
return series
|
||||
series.update(time, price)
|
||||
self.dirty_series.add(series)
|
||||
|
||||
def flush(self) -> None:
|
||||
for series in self.dirty_series:
|
||||
@@ -403,6 +378,3 @@ class FinalOHLCRepository:
|
||||
closing.file.close()
|
||||
# noinspection PyProtectedMember
|
||||
OHLCFile._closing.clear()
|
||||
|
||||
def has_symbol(self, symbol: str):
|
||||
return self.get_series(symbol).exists
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
from ._base import GMXPool, gmx_prices, gmx_tokens, gmx_pools
|
||||
from ._chaininfo import gmx_chain_info
|
||||
from ._handle import gmx_wire_runner_early, gmx_wire_runner_late
|
||||
from ._metadata import *
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
import itertools
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
from eth_utils import keccak
|
||||
|
||||
from dexorder import dec, from_timestamp
|
||||
from dexorder.util import hexbytes, hexstr
|
||||
from dexorder.util.abiencode import abi_decoder
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def no_ws(s):
|
||||
return re.sub(r"\s+", "", s)
|
||||
|
||||
EventLogDataType = '''
|
||||
(((string,address)[],(string,address[])[]),
|
||||
((string,uint256)[],(string,uint256[])[]),
|
||||
((string,int256)[], (string,int256[])[] ),
|
||||
((string,bool)[], (string,bool[])[] ),
|
||||
((string,bytes32)[],(string,bytes32[])[]),
|
||||
((string,bytes)[], (string,bytes[])[] ),
|
||||
((string,string)[], (string,string[])[] )
|
||||
)'''
|
||||
|
||||
EventLogType = f'EventLog( address, string, string, {EventLogDataType} )'
|
||||
EventLog1Type = f'EventLog1( address, string, string, bytes32, {EventLogDataType} )'
|
||||
EventLog2Type = f'EventLog2( address, string, string, bytes32, bytes32, {EventLogDataType} )'
|
||||
|
||||
EventLogTopic = hexstr(keccak(text=no_ws(EventLogType)))
|
||||
EventLog1Topic = hexstr(keccak(text=no_ws(EventLog1Type)).hex())
|
||||
EventLog2Topic = hexstr(keccak(text=no_ws(EventLog2Type)).hex())
|
||||
|
||||
|
||||
def topic_hash(signature):
|
||||
return hexstr(keccak(text=no_ws(signature)))
|
||||
|
||||
|
||||
def parse_event_log_data(event_log_data):
|
||||
"""Parse GMX event log data into a structured dictionary.
|
||||
|
||||
Args:
|
||||
event_log_data: Raw event log data tuple containing address items, uint items,
|
||||
int items, bool items, bytes32 items, bytes items and string items
|
||||
|
||||
Returns:
|
||||
dict: Parsed event data with typed values
|
||||
"""
|
||||
if type(event_log_data) is str:
|
||||
event_log_data = hexbytes(event_log_data)
|
||||
sender, event_name, event_log_data = abi_decoder.decode(('address', 'string', no_ws(EventLogDataType),), event_log_data)
|
||||
|
||||
result = {'sender': sender, 'event': event_name}
|
||||
for items, array_items in event_log_data:
|
||||
for k, v in items:
|
||||
result[k] = v
|
||||
for k, v in array_items:
|
||||
result[k] = v
|
||||
return result
|
||||
|
||||
|
||||
class OracleEvent:
|
||||
|
||||
def __init__(self, event_log):
|
||||
event_log_data = event_log['...']
|
||||
data = parse_event_log_data(event_log_data)
|
||||
self.token: str = data['token']
|
||||
self.provider: str = data['provider']
|
||||
self.min_price: dec = dec(data['minPrice'])
|
||||
self.max_price: dec = dec(data['maxPrice'])
|
||||
self.time: datetime = from_timestamp(data['timestamp'])
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(EventLogTopic)
|
||||
print(EventLog1Topic)
|
||||
print(EventLog2Topic)
|
||||
@@ -1,69 +0,0 @@
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
||||
import requests
|
||||
|
||||
from ._chaininfo import GMX_API_BASE_URLS
|
||||
from .. import dec
|
||||
from ..addrmeta import address_metadata
|
||||
from ..base.chain import current_chain
|
||||
from ..blockstate import BlockDict
|
||||
from ..database.model.token import OldTokenDict
|
||||
from ..util import json
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GMXPool:
|
||||
chain_id: int
|
||||
address: str
|
||||
index_token: str
|
||||
long_token: str
|
||||
short_token: str
|
||||
min_collateral: dec
|
||||
disabled: bool = False
|
||||
|
||||
@property
|
||||
def max_leverage(self):
|
||||
return round(1/self.min_collateral)
|
||||
|
||||
@property
|
||||
def is_enabled(self):
|
||||
return not self.disabled
|
||||
|
||||
def __bool__(self):
|
||||
return self.is_enabled
|
||||
|
||||
def __str__(self):
|
||||
# noinspection PyTypedDict
|
||||
name = address_metadata[self.index_token]['symbol'] if self.index_token in address_metadata else self.index_token
|
||||
# return f'GMX:{name}'
|
||||
def t(addr):
|
||||
# noinspection PyTypedDict
|
||||
return address_metadata[addr]['symbol'] if addr in address_metadata and address_metadata[addr] else addr
|
||||
return f'GMX:{self.address} ({t(self.index_token)}) {t(self.long_token)}-{t(self.short_token)} {self.max_leverage}x'
|
||||
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
||||
@staticmethod
|
||||
def load(s: str):
|
||||
d = json.loads(s)
|
||||
return GMXPool(d['chain_id'], d['address'], d['index_token'], d['long_token'], d['short_token'], dec(d['min_collateral']))
|
||||
|
||||
|
||||
GMX_API_BASE_URL = None
|
||||
|
||||
def gmx_api(method, **params):
|
||||
global GMX_API_BASE_URL
|
||||
if GMX_API_BASE_URL is None:
|
||||
GMX_API_BASE_URL = GMX_API_BASE_URLS[current_chain.get().id]
|
||||
return requests.get(GMX_API_BASE_URL+method, params=params).json()
|
||||
|
||||
|
||||
|
||||
gmx_tokens: BlockDict[str, OldTokenDict] = BlockDict('gmx_t', redis=True)
|
||||
gmx_pools: BlockDict[str, GMXPool] = BlockDict('gmx_m', redis=True, str2value=GMXPool.load)
|
||||
gmx_prices: BlockDict[str, dec] = BlockDict('gmx_p', redis=True, str2value=dec)
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
gmx_chain_info = {
|
||||
42161: {
|
||||
'EventEmitter': '0xC8ee91A54287DB53897056e12D9819156D3822Fb',
|
||||
'DataStore': '0xFD70de6b91282D8017aA4E741e9Ae325CAb992d8',
|
||||
'Reader': '0x0537C767cDAC0726c76Bb89e92904fe28fd02fE1',
|
||||
}
|
||||
}
|
||||
|
||||
GMX_API_BASE_URLS={
|
||||
42161: 'https://arbitrum-api.gmxinfra.io/'
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
import logging
|
||||
from functools import cache
|
||||
|
||||
from dexorder.contract import ContractProxy
|
||||
from dexorder.util import json
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@cache
|
||||
def get_gmx_contract(name: str):
|
||||
with open(f'./resource/abi/42161/gmx/{name}.json') as file:
|
||||
info = json.load(file)
|
||||
return ContractProxy(info['address'], abi=info['abi'])
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
import logging
|
||||
|
||||
from eth_utils import keccak
|
||||
|
||||
from dexorder.util.abiencode import abi_encoder
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def combo_key(key_str, arg, arg_type='address'):
|
||||
key_bytes = keccak(abi_encoder.encode(['string'], [key_str]))
|
||||
return keccak(abi_encoder.encode(['bytes32', arg_type], [key_bytes, arg]))
|
||||
|
||||
IS_MARKET_DISABLED_KEY = 'IS_MARKET_DISABLED'
|
||||
MIN_COLLATERAL_FACTOR_KEY = 'MIN_COLLATERAL_FACTOR'
|
||||
|
||||
|
||||
async def is_market_disabled(ds, market):
|
||||
return await ds.getBool(combo_key(IS_MARKET_DISABLED_KEY, market.address))
|
||||
@@ -1,227 +0,0 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
|
||||
from ._abi import parse_event_log_data
|
||||
from ._base import gmx_api
|
||||
from ._chaininfo import gmx_chain_info
|
||||
from ._metadata import gmx_update_metadata, gmx_token_symbol_map
|
||||
from .. import dec, from_timestamp
|
||||
from ..event_handler import update_pool_price
|
||||
from ..final_ohlc import FinalOHLCRepository
|
||||
from ..ohlc import period_name
|
||||
from ..periodic import periodic
|
||||
from ..progressor import BlockProgressor
|
||||
from ..tokens import get_token
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def gmx_wire_runner_early(runner: BlockProgressor, backfill: FinalOHLCRepository=None):
|
||||
runner.add_event_trigger(gmx_update_metadata)
|
||||
runner.add_event_trigger(create_backfill_handler(backfill) if backfill else gmx_handle_price_update)
|
||||
runner.add_event_trigger(handle_gmx_events, log_filter={'address':gmx_chain_info[42161]['EventEmitter'], })
|
||||
|
||||
|
||||
def gmx_wire_runner_late(runner: BlockProgressor):
|
||||
pass
|
||||
|
||||
|
||||
def handle_marketpoolvalueupdated_event(event: dict):
|
||||
# log.info(f'marketpoolvalueupdated: {event}')
|
||||
# {
|
||||
# 'sender': '0x3f6df0c3a7221ba1375e87e7097885a601b41afc',
|
||||
# 'event': 'MarketPoolValueUpdated',
|
||||
# 'market': '0x3680d7bfe9260d3c5de81aeb2194c119a59a99d1',
|
||||
# 'longTokenAmount': 19476307269091870091,
|
||||
# 'shortTokenAmount': 50700920349,
|
||||
# 'longTokenUsd': 53158489854101648408924993481922790,
|
||||
# 'shortTokenUsd': 50693714261673382360507024950000000,
|
||||
# 'totalBorrowingFees': 41601639378800206440170070581268,
|
||||
# 'borrowingFeePoolFactor': 630000000000000000000000000000,
|
||||
# 'impactPoolAmount': 3152510497,
|
||||
# 'marketTokensSupply': 93026785041268146396614,
|
||||
# 'poolValue': 102896287811364604212637168767403111,
|
||||
# 'longPnl': 142313435006510688238425736783147,
|
||||
# 'shortPnl': -19431172516799270439150252797270,
|
||||
# 'netPnl': 122882262489711417799275483985877,
|
||||
# 'actionType': b'`y\x91\xfcYc\xe2d\xf1\xa9O\xaa\x12lcH/\xdcZ\xf1Jeo\x08u\x1d\xc8\xb0\xc5\xd4v0',
|
||||
# 'tradeKey': b'Z]Qws\xaf\x115\x89\x85\xdd\xce)\x93t\xc4C\xccx{\x85N\xa0\x17B\x99r#\xd9~\x94\xd1'
|
||||
# }
|
||||
|
||||
pass
|
||||
|
||||
|
||||
event_handlers = {
|
||||
'OraclePriceUpdate': None,
|
||||
|
||||
'MarketPoolValueInfo': None,
|
||||
'MarketPoolValueUpdated': handle_marketpoolvalueupdated_event,
|
||||
|
||||
'OrderCreated': None,
|
||||
'OrderUpdated': None,
|
||||
'OrderCancelled': None,
|
||||
'OrderExecuted': None,
|
||||
'OrderCollateralDeltaAmountAutoUpdated': None,
|
||||
|
||||
'PositionIncrease': None,
|
||||
'PositionDecrease': None,
|
||||
'PositionFeesCollected': None,
|
||||
|
||||
'OpenInterestInTokensUpdated': None,
|
||||
'OpenInterestUpdated': None,
|
||||
|
||||
'CollateralSumUpdated': None,
|
||||
|
||||
'ClaimableFeeAmountUpdated': None,
|
||||
'ClaimableFundingUpdated': None,
|
||||
'ClaimableFundingAmountPerSizeUpdated': None,
|
||||
'FundingFeesClaimed': None,
|
||||
|
||||
'PoolAmountUpdated': None,
|
||||
|
||||
'VirtualSwapInventoryUpdated': None,
|
||||
'SwapInfo': None,
|
||||
'SwapFeesCollected': None,
|
||||
|
||||
'SwapImpactPoolAmountUpdated': None,
|
||||
'PositionImpactPoolAmountUpdated': None,
|
||||
'VirtualPositionInventoryUpdated': None,
|
||||
|
||||
'CumulativeBorrowingFactorUpdated': None,
|
||||
|
||||
'KeeperExecutionFee': None,
|
||||
'ExecutionFeeRefund': None,
|
||||
'FundingFeeAmountPerSizeUpdated': None,
|
||||
|
||||
'SetUint': None,
|
||||
# SetBytes32 presumably and others...
|
||||
'SyncConfig': None,
|
||||
|
||||
'DepositCreated': None,
|
||||
'DepositExecuted': None,
|
||||
'WithdrawalCreated': None,
|
||||
'WithdrawalExecuted': None,
|
||||
'ShiftCreated': None,
|
||||
'ShiftExecuted': None,
|
||||
|
||||
'GlvValueUpdated': None,
|
||||
'GlvDepositCreated': None,
|
||||
'GlvDepositExecuted': None,
|
||||
'GlvWithdrawalCreated': None,
|
||||
'GlvWithdrawalExecuted': None,
|
||||
'GlvShiftCreated': None,
|
||||
'GlvShiftExecuted': None,
|
||||
|
||||
'AffiliateRewardUpdated': None,
|
||||
'IncrementSubaccountActionCount': None,
|
||||
|
||||
}
|
||||
|
||||
|
||||
async def handle_gmx_events(events: list[dict]):
|
||||
for event in events:
|
||||
data = parse_event_log_data(event['data'])
|
||||
event_name = data['event']
|
||||
try:
|
||||
event_handlers[event_name](data)
|
||||
except KeyError:
|
||||
log.debug(f'Unknown event {event_name}')
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
|
||||
initialized = False
|
||||
|
||||
@periodic(timedelta(hours=1))
|
||||
async def gmx_handle_metadata_update():
|
||||
global initialized
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
await gmx_update_metadata()
|
||||
initialized = True
|
||||
except:
|
||||
if not initialized:
|
||||
raise
|
||||
log.exception('Exception in gmx_handle_metadata_update()')
|
||||
|
||||
|
||||
@periodic(timedelta(seconds=1))
|
||||
async def gmx_handle_price_update():
|
||||
updates = await fetch_price_updates()
|
||||
# ticker updates have only one price per addr so we can parallelize setting prices
|
||||
await asyncio.gather(*[update_pool_price(addr, time, price, 30) for addr, time, price in updates])
|
||||
|
||||
|
||||
def create_backfill_handler(ohlcs: FinalOHLCRepository):
|
||||
|
||||
@periodic(timedelta(seconds=1))
|
||||
async def gmx_handle_price_update_with_backfill():
|
||||
updates = await fetch_price_updates()
|
||||
backfill_addrs = [addr for addr, time, price in updates if not ohlcs.has_symbol(addr)]
|
||||
|
||||
backfill_addrs = [backfill_addrs[0]] # todo remove debug
|
||||
|
||||
if backfill_addrs:
|
||||
log.info(f'Backfilling {len(backfill_addrs)} new GMX tokens')
|
||||
await asyncio.gather(*[backfill_token(ohlcs, a) for a in backfill_addrs])
|
||||
|
||||
for addr, time, price in updates:
|
||||
ohlcs.update(addr, time, price)
|
||||
|
||||
return gmx_handle_price_update_with_backfill
|
||||
|
||||
|
||||
def push_candle(ohlcs, addr, period, candle):
|
||||
time, *prices = candle
|
||||
time = from_timestamp(time)
|
||||
prices = [dec(p) for p in prices]
|
||||
ohlcs.update_ohlc(addr, period, time, *prices)
|
||||
|
||||
|
||||
GMX_OHLC_PERIODS = [
|
||||
timedelta(minutes=1),
|
||||
timedelta(minutes=5),
|
||||
timedelta(minutes=15),
|
||||
timedelta(hours=1),
|
||||
timedelta(hours=4),
|
||||
timedelta(days=1),
|
||||
]
|
||||
|
||||
async def backfill_token(ohlcs: FinalOHLCRepository, addr: str):
|
||||
token = await get_token(addr)
|
||||
addr = token['address']
|
||||
for period in GMX_OHLC_PERIODS:
|
||||
# Polling a large window is the only history method GMX provides :( It's also how their web client works!
|
||||
symbol = gmx_token_symbol_map[addr]
|
||||
interval = period_name(period).lower()
|
||||
response = gmx_api('prices/candles', tokenSymbol=symbol, period=interval, limit=10_000)
|
||||
if 'error' in response:
|
||||
if not response['error'].startswith('unsupported period'):
|
||||
log.warning(f'Could not query token backfill for {token["symbol"]}: {response["error"]}')
|
||||
else:
|
||||
for c in reversed(response['candles']):
|
||||
push_candle(ohlcs, addr, period, c)
|
||||
log.info(f'Backfilled new GMX token {token["symbol"]}')
|
||||
|
||||
|
||||
async def fetch_price_updates():
|
||||
updates = []
|
||||
for t in gmx_api('prices/tickers'):
|
||||
"""
|
||||
{
|
||||
"tokenAddress": "0x3Eea56A1ccCdbfB70A26aD381C71Ee17E4c8A15F",
|
||||
"tokenSymbol": "BOME",
|
||||
"minPrice": "1621019778803375000000",
|
||||
"maxPrice": "1621534421901125000000",
|
||||
"updatedAt": 1749849326251,
|
||||
"timestamp": 1749849325
|
||||
},
|
||||
"""
|
||||
addr = t['tokenAddress']
|
||||
# GMX prices use 30 decimal places
|
||||
price = (dec(t['minPrice']) + dec(t['maxPrice'])) / 2 * dec(10) ** dec(-30)
|
||||
time = from_timestamp(t['timestamp'])
|
||||
updates.append((addr, time, price))
|
||||
return updates
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
__all__ = ['gmx_update_metadata', 'gmx_token_symbol_map']
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from copy import copy
|
||||
from pprint import pprint
|
||||
|
||||
from dexorder import dec, ADDRESS_0
|
||||
from dexorder.addrmeta import address_metadata
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.database.model.token import OldTokenDict
|
||||
from dexorder.gmx._base import GMXPool, gmx_tokens, gmx_pools, gmx_api
|
||||
from dexorder.gmx._contract import get_gmx_contract
|
||||
from dexorder.gmx._datastore import combo_key, MIN_COLLATERAL_FACTOR_KEY, is_market_disabled
|
||||
from dexorder.tokens import get_token
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def gmx_update_metadata():
|
||||
await gmx_detect_tokens()
|
||||
await gmx_detect_pools()
|
||||
|
||||
|
||||
async def gmx_detect_tokens():
|
||||
chain_id = current_chain.get().id
|
||||
response = gmx_api('tokens')
|
||||
if 'tokens' not in response:
|
||||
raise ValueError('No tokens in GMX response')
|
||||
|
||||
tokens = []
|
||||
for info in response['tokens']:
|
||||
synthetic = info.get('synthetic',False)
|
||||
name = f'GMX {info["symbol"]}'
|
||||
if synthetic:
|
||||
name += ' Synthetic'
|
||||
token = OldTokenDict(type='Token', chain=chain_id, address=info['address'], name=name,
|
||||
symbol=info['symbol'], decimals=info['decimals'], approved=True, gmx_synthetic=synthetic)
|
||||
if re.search(r'deprecated', info['symbol'], re.IGNORECASE):
|
||||
continue
|
||||
gmx_token_symbol_map[token['address']] = token['symbol']
|
||||
if synthetic and token['address'] not in address_metadata:
|
||||
# noinspection PyTypeChecker
|
||||
address_metadata[info['address']] = token
|
||||
gmx_tokens[token['address']] = token
|
||||
tokens.append(token)
|
||||
|
||||
# Delete any tokens that are no longer listed
|
||||
valid_addrs = set(t['address'] for t in tokens)
|
||||
invalid_addrs = set(md['address'] for md in gmx_tokens.values() if md['address'] not in valid_addrs)
|
||||
for addr in invalid_addrs:
|
||||
log.info(f'Deleting invalid GMX token {gmx_tokens[addr]}')
|
||||
del gmx_tokens[addr]
|
||||
|
||||
|
||||
async def gmx_detect_pools():
|
||||
ds = get_gmx_contract('DataStore')
|
||||
reader = get_gmx_contract('Reader')
|
||||
market_info = await reader.getMarkets(ds.address, 0, 1000)
|
||||
markets = [
|
||||
GMXPool(42161, market_token, index_token, long_token, short_token, dec('nan'))
|
||||
for market_token, long_token, short_token, index_token in market_info
|
||||
]
|
||||
# some pools have ADDRESS_0 as the long token. wat?
|
||||
markets = [m for m in markets if m.long_token != ADDRESS_0 and m.short_token != ADDRESS_0 and m.index_token != ADDRESS_0 and m.address != ADDRESS_0]
|
||||
market_disabled = await asyncio.gather(*[is_market_disabled(ds, m) for m in markets])
|
||||
markets = [m for m,d in zip(markets, market_disabled) if not d]
|
||||
valid_addrs = set(m.address for m in markets)
|
||||
invalid_addrs = set(p.address for p in gmx_pools.values() if p.address not in valid_addrs)
|
||||
new_markets = [m for m in markets if m.address not in gmx_pools]
|
||||
|
||||
async def init_market(m):
|
||||
result = await ds.getUint(combo_key(MIN_COLLATERAL_FACTOR_KEY, m.address))
|
||||
if result == 0:
|
||||
# raise ValueError(f'no min collateral factor for market {m.address}')
|
||||
log.warning(f'no min collateral factor for market {m.address}')
|
||||
m.min_collateral = 2 * dec(result) / dec(1e30)
|
||||
gmx_pools[m.address] = m
|
||||
await asyncio.gather(*[init_market(m) for m in new_markets])
|
||||
token_addrs = set(t for m in new_markets for t in (m.address, m.index_token, m.long_token, m.short_token))
|
||||
await asyncio.gather(*[get_token(t) for t in token_addrs])
|
||||
|
||||
# Disable any markets that are not longer valid
|
||||
for addr in invalid_addrs:
|
||||
log.info(f'Disabling GMX market {gmx_pools[addr]}')
|
||||
updated = copy(gmx_pools[addr])
|
||||
updated.disabled = True
|
||||
gmx_pools[addr] = updated
|
||||
|
||||
pprint(new_markets)
|
||||
return markets
|
||||
|
||||
gmx_token_symbol_map: dict[str, str] = {} # maps addresses to GMX token symbols
|
||||
@@ -164,7 +164,7 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
|
||||
returns an ordered list of OHLC's that have been created/modified by the new time/price
|
||||
if price is None, then bars are advanced based on the time but no new price is added to the series.
|
||||
"""
|
||||
log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
|
||||
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
|
||||
cur = prev
|
||||
if time < cur.start:
|
||||
# data corruption. just shut down
|
||||
@@ -177,13 +177,13 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
|
||||
break
|
||||
result.append(cur)
|
||||
cur = NativeOHLC(end, None, None, None, cur.close)
|
||||
log.debug(f'\ttime advancements: {result}')
|
||||
# log.debug(f'\ttime advancements: {result}')
|
||||
# if we are setting a price, update the current bar
|
||||
if price is not None:
|
||||
cur.update(price)
|
||||
result.append(cur)
|
||||
log.debug(f'\tappended current bar: {cur.ohlc}')
|
||||
log.debug(f'\tupdate result: {result}')
|
||||
# log.debug(f'\tappended current bar: {cur.ohlc}')
|
||||
# log.debug(f'\tupdate result: {result}')
|
||||
return result
|
||||
|
||||
class OHLCKey (NamedTuple):
|
||||
@@ -342,17 +342,11 @@ class OHLCRepository:
|
||||
def add_symbol(symbol: str, period: timedelta = None):
|
||||
if period is not None:
|
||||
if (symbol, period) not in recent_ohlcs:
|
||||
recent_ohlcs[OHLCKey(symbol, period)] = [] # setting an empty value will initiate price capture
|
||||
recent_ohlcs[(symbol, period)] = [] # setting an empty value will initiate price capture
|
||||
else:
|
||||
for period in OHLC_PERIODS:
|
||||
if (symbol, period) not in recent_ohlcs:
|
||||
recent_ohlcs[OHLCKey(symbol, period)] = []
|
||||
|
||||
|
||||
@staticmethod
|
||||
def has_symbol(symbol: str, period: timedelta):
|
||||
return OHLCKey(symbol, period) in recent_ohlcs
|
||||
|
||||
recent_ohlcs[(symbol, period)] = []
|
||||
|
||||
async def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True):
|
||||
""" the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """
|
||||
@@ -370,7 +364,7 @@ class OHLCRepository:
|
||||
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
|
||||
if price is not None:
|
||||
self.quotes[symbol] = timestamp(time), str(price)
|
||||
key = OHLCKey(symbol, period)
|
||||
key = symbol, period
|
||||
# recent_ohlcs holds a list of "recent" NativeOHLC's stored as blockdata. we try to keep the recent array long
|
||||
# enough to extend prior the root block time
|
||||
historical: Optional[list[NativeOHLC]] = recent_ohlcs.get(key)
|
||||
|
||||
@@ -3,7 +3,6 @@ from dataclasses import dataclass
|
||||
from typing import Optional, Union, Any
|
||||
from uuid import UUID
|
||||
|
||||
from triton.profiler import deactivate
|
||||
from web3.exceptions import ContractPanicError, ContractLogicError
|
||||
from web3.types import EventData
|
||||
|
||||
|
||||
@@ -111,10 +111,10 @@ async def update_balance_triggers(vault: str, token: str):
|
||||
await asyncio.gather(*updates)
|
||||
|
||||
|
||||
def update_price_triggers(addr: str, price: dec, decimals: int):
|
||||
price = price * dec(10) ** dec(-decimals) # adjust for pool decimals to get onchain price
|
||||
async def update_price_triggers(pool: OldPoolDict, price: dec):
|
||||
price = price * dec(10) ** dec(-pool['decimals']) # adjust for pool decimals to get onchain price
|
||||
price = float(price) # since we use SIMD operations to evaluate lines, we must convert to float
|
||||
for pt in PriceLineTrigger.by_pool.get(addr, []):
|
||||
for pt in PriceLineTrigger.by_pool.get(pool['address'], []):
|
||||
pt.update(price)
|
||||
|
||||
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
from datetime import timedelta
|
||||
import time
|
||||
import asyncio
|
||||
from functools import wraps
|
||||
|
||||
def periodic(period: timedelta|float):
|
||||
"""
|
||||
Decorator to allow only one execution of a function or coroutine per period.
|
||||
Works for both sync and async functions.
|
||||
"""
|
||||
def decorator(func):
|
||||
last_called = {'time': 0.}
|
||||
period_seconds = period.total_seconds() if isinstance(period, timedelta) else period
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
now = time.monotonic()
|
||||
if now - last_called['time'] >= period_seconds:
|
||||
last_called['time'] = now
|
||||
return func(*args, **kwargs)
|
||||
return None
|
||||
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
now = time.monotonic()
|
||||
if now - last_called['time'] >= period_seconds:
|
||||
last_called['time'] = now
|
||||
return await func(*args, **kwargs)
|
||||
return None
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
@@ -40,8 +40,8 @@ class BlockProgressor(metaclass=ABCMeta):
|
||||
def add_event_trigger(self,
|
||||
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
|
||||
callback: Union[
|
||||
Callable[[EventData|dict], Maywaitable[None]],
|
||||
Callable[[list[EventData|dict]], Maywaitable[None]],
|
||||
Callable[[EventData], Maywaitable[None]],
|
||||
Callable[[list[EventData]], Maywaitable[None]],
|
||||
Callable[[], Maywaitable[None]],
|
||||
],
|
||||
event: ContractEvents = None,
|
||||
|
||||
@@ -316,7 +316,7 @@ class BlockStateRunner(BlockProgressor):
|
||||
else:
|
||||
lf = dict(log_filter)
|
||||
lf['blockHash'] = hexstr(block.hash)
|
||||
has_logs = 'topics' not in lf or any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
|
||||
has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
|
||||
# log.debug(f'has {event.__class__.__name__}? {has_logs}')
|
||||
if not has_logs:
|
||||
get_logs = None
|
||||
|
||||
@@ -52,7 +52,6 @@ async def get_token(address) -> Optional[OldTokenDict]:
|
||||
# noinspection PyTypeChecker
|
||||
return address_metadata[address]
|
||||
except KeyError:
|
||||
# noinspection PyTypeChecker
|
||||
result = address_metadata[address] = await load_token(address)
|
||||
return result
|
||||
|
||||
|
||||
Reference in New Issue
Block a user