diff --git a/alembic/script.py.mako b/alembic/script.py.mako index f6d679d..8b838fa 100644 --- a/alembic/script.py.mako +++ b/alembic/script.py.mako @@ -10,6 +10,7 @@ from typing import Sequence, Union from alembic import op import sqlalchemy as sa import dexorder.database +import dexorder.database.column_types ${imports if imports else ""} # revision identifiers, used by Alembic. diff --git a/alembic/versions/db62e7db828d_initial_schema.py b/alembic/versions/db62e7db828d_initial_schema.py index b47c7f3..b9dcae9 100644 --- a/alembic/versions/db62e7db828d_initial_schema.py +++ b/alembic/versions/db62e7db828d_initial_schema.py @@ -10,6 +10,7 @@ from typing import Sequence, Union from alembic import op import sqlalchemy as sa import dexorder.database +import dexorder.database.column_types from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. @@ -47,14 +48,14 @@ def upgrade() -> None: sa.PrimaryKeyConstraint('chain', 'series', 'key') ) op.create_table('transactionjob', - sa.Column('id', sa.UUID(), nullable=False), - sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), - sa.Column('height', sa.Integer(), nullable=False), - # sa.Column('state', sa.Enum(name='transactionjobstate'), nullable=False), - sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), - sa.Column('request', dexorder.database.column_types._DataclassDict(astext_type=sa.Text()), nullable=False), - sa.PrimaryKeyConstraint('id') - ) + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + # sa.Column('state', sa.Enum(name='transactionjobstate'), nullable=False), + sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), + sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint('id') + ) op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) @@ -73,9 +74,19 @@ def upgrade() -> None: sa.Column('state', sa.Enum('Open', 'Canceled', 'Filled', 'Expired', 'Underfunded', name='swaporderstate'), nullable=False), sa.PrimaryKeyConstraint('chain', 'vault', 'order_index') ) + op.create_table('pool', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('address', dexorder.database.column_types.Address(), nullable=False), + sa.Column('exchange', sa.Enum('UniswapV2', 'UniswapV3', name='exchange'), nullable=False), + sa.Column('base', dexorder.database.column_types.Address(), nullable=False), + sa.Column('quote', dexorder.database.column_types.Address(), nullable=False), + sa.Column('fee', sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'address') + ) def downgrade() -> None: + op.drop_table('pool') op.drop_table('orderindex') op.drop_table('seriesset') op.drop_table('seriesdict') diff --git a/src/dexorder/database/column_types.py b/src/dexorder/database/column_types.py index 9d96657..522f375 100644 --- a/src/dexorder/database/column_types.py +++ b/src/dexorder/database/column_types.py @@ -72,7 +72,7 @@ def Fixed(bits, dbits, signed=False): return result -class _DataclassDict(TypeDecorator): +class DataclassDictBase(TypeDecorator): impl = JSONB def process_bind_param(self, value, dialect): @@ -82,6 +82,6 @@ class _DataclassDict(TypeDecorator): return self.Constructor(**value) def DataclassDict(constructor): - result = _DataclassDict() + result = DataclassDictBase() result.Constructor = constructor return result diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index 6581392..cc39fcb 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -4,3 +4,4 @@ from .block import Block from .series import SeriesSet, SeriesDict from .transaction import Transaction, TransactionJob from .orderindex import OrderIndex +from .pool import Pool diff --git a/src/dexorder/database/model/pool.py b/src/dexorder/database/model/pool.py index 7ac88ad..aa141b6 100644 --- a/src/dexorder/database/model/pool.py +++ b/src/dexorder/database/model/pool.py @@ -8,7 +8,7 @@ from dexorder.database.model import Base log = logging.getLogger(__name__) -class PoolModel (Base): +class Pool (Base): __tablename__ = 'pool' chain: Mapped[Blockchain] = mapped_column(primary_key=True) @@ -16,4 +16,4 @@ class PoolModel (Base): exchange: Mapped[Exchange] base: Mapped[Address] quote: Mapped[Address] - fee: int # in millionths aka 100ths of a bip + fee: Mapped[int] # in millionths aka 100ths of a bip diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index eee4793..bf09c7e 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -9,11 +9,11 @@ from dexorder import current_pub, db, dec from dexorder.base.chain import current_chain from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.transaction import create_transactions, submit_transaction_request, handle_transaction_receipts, send_transactions -from dexorder.uniswap import uniswap_price +from dexorder.pools import uniswap_price from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract from dexorder.contract import get_contract_event, ERC20 from dexorder.data import vault_owners, vault_balances -from dexorder.pool import new_pool_prices, pool_prices +from dexorder.pools import new_pool_prices, pool_prices, pool_decimals, Pools from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState @@ -192,7 +192,7 @@ async def handle_uniswap_swap(swap: EventData): except KeyError: return addr = swap['address'] - price: dec = await uniswap_price(addr, sqrt_price) + price: dec = await uniswap_price(await Pools.get(addr), sqrt_price) log.debug(f'pool {addr} {price}') pool_prices[addr] = price diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 95f2fec..f1925e9 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -5,12 +5,12 @@ from enum import Enum, auto from typing import Callable, Optional, Union, Awaitable from dexorder.blockstate import BlockSet, BlockDict -from dexorder.base.orderlib import SwapOrderState, PriceProof +from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST from dexorder.util import defaultdictk from .orderstate import Order from .. import dec from ..base.order import OrderKey, TrancheKey, ExecutionRequest -from ..pool import ensure_pool_price, Pool +from ..pools import ensure_pool_price, Pools, pool_decimals from ..database.model.block import current_block from ..routing import pool_address @@ -36,8 +36,8 @@ async def activate_order(order: Order): Call this to enable triggers on an order which is already in the state. """ address = pool_address(order.status.order) - await ensure_pool_price(address) - pool = await Pool.get(address) + pool = await Pools.get(address) + await ensure_pool_price(pool) inverted = pool.base != order.order.tokenIn if inverted: assert pool.base == order.order.tokenOut @@ -56,6 +56,8 @@ def intersect_ranges( a_low, a_high, b_low, b_high): async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool: b, m = lc + if b == 0 and m == 0: + return True limit = m * current_block.get().timestamp + b # todo ratios # prices AT the limit get zero volume, so we only trigger on >, not >= @@ -81,24 +83,27 @@ class TrancheTrigger: tranche_remaining = tranche_amount - tranche_filled # time and price constraints - self.time_constraint = tranche.startTime, tranche.endTime + self.time_constraint = [tranche.startTime, tranche.endTime] if tranche.startTimeIsRelative: self.time_constraint[0] += self.order.status.start if tranche.endTimeIsRelative: self.time_constraint[1] += self.order.status.start + if self.time_constraint[0] <= DISTANT_PAST and self.time_constraint[1] >= DISTANT_FUTURE: + self.time_constraint = None self.min_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.minIntercept, tranche.minSlope) self.max_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.maxIntercept, tranche.maxSlope) self.has_line_constraint = any( a or b for a,b in (self.min_line_constraint, self.max_line_constraint)) if not tranche.marketOrder and inverted: self.min_line_constraint, self.max_line_constraint = self.max_line_constraint, self.min_line_constraint self.slippage = tranche.minIntercept if tranche.marketOrder else 0 + self.pool_price_multiplier = None # compute status and set relevant triggers if tranche_remaining <= 0: self.status = TrancheStatus.Filled return timestamp = current_block.get().timestamp - self.status = TrancheStatus.Early if timestamp < tranche.startTime else TrancheStatus.Expired if timestamp > tranche.endTime else TrancheStatus.Pricing + self.status = TrancheStatus.Early if timestamp < self.time_constraint[0] else TrancheStatus.Expired if timestamp > self.time_constraint[1] else TrancheStatus.Pricing self.enable_time_trigger() if self.status == TrancheStatus.Pricing: self.enable_price_trigger() @@ -146,6 +151,11 @@ class TrancheTrigger: if self.closed: log.debug(f'price trigger ignored because trigger status is {self.status}') return + if self.pool_price_multiplier is None: + pool = await Pools.get(pool_address(self.order.order)) + pool_dec = await pool_decimals(pool) + self.pool_price_multiplier = dec(10) ** dec(-pool_dec) + cur *= self.pool_price_multiplier if not self.has_line_constraint or all(await asyncio.gather( line_passes(self.min_line_constraint, True, cur), line_passes(self.max_line_constraint, False, cur))): diff --git a/src/dexorder/pool.py b/src/dexorder/pool.py deleted file mode 100644 index 6c8f420..0000000 --- a/src/dexorder/pool.py +++ /dev/null @@ -1,54 +0,0 @@ -import asyncio -import logging -from typing import Optional - -from dexorder import dec, db, current_w3 -from dexorder.base.chain import current_chain -from dexorder.base.orderlib import Exchange -from dexorder.blockstate import BlockDict -from dexorder.blockstate.blockdata import K, V -from dexorder.database.model.pool import PoolModel -from dexorder.uniswap import UniswapV3Pool - -log = logging.getLogger(__name__) - -class Pool: - instances: dict[tuple[int,str], PoolModel] = {} - - @staticmethod - async def get(address, chain=None) -> Optional[PoolModel]: - if not chain: - chain = current_chain.get() - key = (chain.chain_id, address) - found = Pool.instances.get(key) - if not found: - found = db.session.get(key) - if not found: - # todo other exchanges - t0, t1, fee = await asyncio.gather(UniswapV3Pool(address).token0(), UniswapV3Pool(address).token1(), UniswapV3Pool(address).fee()) - found = PoolModel(chain=chain, address=address, exchange=Exchange.UniswapV3, base=t0, quote=t1, fee=fee) - db.session.add(found) - Pool.instances[key] = found - return found - - -class PoolPrices (BlockDict[str, dec]): - def __setitem__(self, item: K, value: V) -> None: - super().__setitem__(item, value) - new_pool_prices[item] = value - - -def pub_pool_price(k,v): - chain_id = current_chain.get().chain_id - return f'{chain_id}|{k}', 'p', (chain_id, k, str(v)) - - -new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the current block. cleared every block. -pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec) - - -async def ensure_pool_price(pool_addr): - if pool_addr not in pool_prices: - log.debug(f'querying price for pool {pool_addr}') - pool_prices[pool_addr] = await UniswapV3Pool(pool_addr).price() - diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py new file mode 100644 index 0000000..0b6492e --- /dev/null +++ b/src/dexorder/pools.py @@ -0,0 +1,88 @@ +import asyncio +import logging +from typing import Optional + +from dexorder import dec, db +from dexorder.base.chain import current_chain +from dexorder.base.orderlib import Exchange +from dexorder.blockstate import BlockDict +from dexorder.blockstate.blockdata import K, V +from dexorder.contract.decimals import token_decimals +from dexorder.database.model.pool import Pool +from dexorder.uniswap import UniswapV3Pool + +log = logging.getLogger(__name__) + +class Pools: + instances: dict[tuple[int,str], Pool] = {} + + @staticmethod + async def get(address, *, chain=None) -> Optional[Pool]: + if not chain: + chain = current_chain.get() + key = (chain, address) + found = Pools.instances.get(key) + if not found: + found = db.session.get(Pool, key) + if not found: + # todo other exchanges + t0, t1, fee = await asyncio.gather(UniswapV3Pool(address).token0(), UniswapV3Pool(address).token1(), UniswapV3Pool(address).fee()) + found = Pool(chain=chain, address=address, exchange=Exchange.UniswapV3, base=t0, quote=t1, fee=fee) + db.session.add(found) + Pools.instances[key] = found + return found + + +class PoolPrices (BlockDict[str, dec]): + def __setitem__(self, item: K, value: V) -> None: + super().__setitem__(item, value) + new_pool_prices[item] = value + + +def pub_pool_price(k,v): + chain_id = current_chain.get().chain_id + return f'{chain_id}|{k}', 'p', (chain_id, k, str(v)) + + +new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the current block. cleared every block. +pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec) + + + +async def uniswap_price(pool, sqrt_price=None) -> dec: + if sqrt_price is None: + sqrt_price = (await UniswapV3Pool(pool.address).slot0())[0] + pool_dec = await pool_decimals(pool) + price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2) + result = price * dec(10) ** dec(pool_dec) + log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}') + return result + + +async def ensure_pool_price(pool): + # todo refactor to accept a Pool and switch on exchange type + if pool not in pool_prices: + log.debug(f'querying price for pool {pool.address}') + if pool.exchange == Exchange.UniswapV3: + pool_prices[pool.address] = await uniswap_price(pool) + else: + raise ValueError + + +_pool_decimals = {} +async def pool_decimals(pool): + found = _pool_decimals.get(pool) + if found is None: + key = f'pd|{pool.address}' + try: + found = db.kv[key] + log.debug('got decimals from db') + except KeyError: + found = _pool_decimals[pool] = await token_decimals(pool.base) - await token_decimals(pool.quote) + decimals0 = await token_decimals(pool.base) + decimals1 = await token_decimals(pool.quote) + decimals = decimals0 - decimals1 + db.kv[key] = decimals + log.debug(f'pool decimals: {decimals0} - {decimals1}') + log.debug(f'pool decimals {pool.address} {found}') + return found diff --git a/src/dexorder/uniswap.py b/src/dexorder/uniswap.py index f82275f..d732bec 100644 --- a/src/dexorder/uniswap.py +++ b/src/dexorder/uniswap.py @@ -1,12 +1,10 @@ from charset_normalizer.md import getLogger from eth_utils import keccak, to_bytes, to_checksum_address -from dexorder import dec, db from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, Arbitrum, Mock, Alpha from dexorder.blockchain import ByBlockchainDict from dexorder.contract import ContractProxy from dexorder.util.abiencode import abi_encoder -from dexorder.contract.decimals import token_decimals from dexorder.util import hexbytes UNISWAPV3_POOL_INIT_CODE_HASH = hexbytes('0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54') @@ -48,32 +46,6 @@ def uniswap_pool_address(factory_addr: str, addr_a: str, addr_b: str, fee: int) return result -async def uniswap_price(addr, sqrt_price) -> dec: - price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2) - decimals = await pool_decimals(addr) - result = price * dec(10) ** dec(decimals) - log.debug(f'pool sqrtX96 {sqrt_price} with {decimals} decimals = {result}') - return result - - -async def pool_decimals(addr): - key = f'pd|{addr}' - try: - decimals = db.kv[key] - log.debug('got decimals from db') - except KeyError: - pool = UniswapV3Pool(addr) - token0 = await pool.token0() - token1 = await pool.token1() - decimals0 = await token_decimals(token0) - decimals1 = await token_decimals(token1) - decimals = decimals0 - decimals1 - db.kv[key] = decimals - log.debug(f'pool decimals: {decimals0} - {decimals1}') - log.debug(f'pool decimals {addr} {decimals}') - return decimals - - class _UniswapContracts (ByBlockchainDict[ContractProxy]): def __init__(self): diff --git a/src/dexorder/util/convert.py b/src/dexorder/util/convert.py index c87b764..4bbe183 100644 --- a/src/dexorder/util/convert.py +++ b/src/dexorder/util/convert.py @@ -21,7 +21,7 @@ def tick_to_sqrtPriceX96(tick): return round(math.sqrt(tick_to_price(tick)) * 2**96) def encode_IEEE754(value: float) -> int: - return struct.pack('>I', struct.pack('>f', value))[0] + return struct.unpack('>I', struct.pack('>f', value))[0] def decode_IEEE754(value: int) -> float: return struct.unpack('>f', struct.pack('>I', value))[0]