OHLC bugfixes
This commit is contained in:
@@ -34,6 +34,7 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
||||
data = await get_uniswap_data(swap)
|
||||
if data is not None:
|
||||
pool, time, price = data
|
||||
log.debug(f'OHLC {pool["address"]} {time} {price}')
|
||||
ohlcs.light_update_all(pool['address'], time, price)
|
||||
|
||||
def flush_callback():
|
||||
|
||||
@@ -67,7 +67,8 @@ async def main():
|
||||
log.debug(f'\tmirror result {mirror_addr} {mirror_inverted}')
|
||||
mirror_pools.append((mirror_addr, mirror_inverted))
|
||||
await write_metadata(pools, mirror_pools)
|
||||
log.info('Mirroring pools')
|
||||
delay = config.polling if config.polling > 0 else 1
|
||||
log.info(f'Mirroring pools every {delay} seconds')
|
||||
while True:
|
||||
proms = []
|
||||
for pool_addr, (mirror_addr, mirror_inverted) in zip(pools, mirror_pools):
|
||||
@@ -78,7 +79,7 @@ async def main():
|
||||
except Exception as x:
|
||||
log.exception(x)
|
||||
await asyncio.gather(*[await_mirror(*args) for args in proms])
|
||||
await asyncio.sleep(config.polling if config.polling > 0 else 1)
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -387,8 +387,13 @@ class OHLCRepository:
|
||||
for chunk in self.dirty_chunks:
|
||||
chunk.save()
|
||||
self.dirty_chunks.clear()
|
||||
with open(os.path.join(self.dir, quotes_path()), 'w') as f:
|
||||
json.dump(self.quotes, f)
|
||||
filepath = os.path.join(self.dir, quotes_path())
|
||||
for _ in range(2):
|
||||
try:
|
||||
with open(filepath, 'w') as f:
|
||||
json.dump(self.quotes, f)
|
||||
except FileNotFoundError:
|
||||
os.makedirs(os.path.dirname(filepath), exist_ok=True)
|
||||
|
||||
|
||||
class LightOHLCRepository (OHLCRepository):
|
||||
@@ -409,14 +414,17 @@ class LightOHLCRepository (OHLCRepository):
|
||||
if price is not None:
|
||||
self.quotes[symbol] = timestamp(time), str(price)
|
||||
start = ohlc_start_time(time, period)
|
||||
log.debug(f'OHLC start_time {start}')
|
||||
chunk = self.get_chunk(symbol, period, start)
|
||||
key = symbol, period
|
||||
prev = self.current.get(key)
|
||||
if prev is None:
|
||||
# cache miss. load from chunk.
|
||||
prev = self.current[key] = chunk.bar_at(start)
|
||||
log.debug(f'loaded prev bar from chunk {prev}')
|
||||
if prev is None:
|
||||
# not in cache or chunk. create new bar.
|
||||
log.debug(f'no prev bar')
|
||||
if price is not None:
|
||||
close = price
|
||||
else:
|
||||
@@ -425,7 +433,7 @@ class LightOHLCRepository (OHLCRepository):
|
||||
except KeyError:
|
||||
log.warning(f'light_update tried to advance time on {symbol} which has no previous price.')
|
||||
return # no previous quote, no current price either
|
||||
bar = NativeOHLC(start, price, price, price, close)
|
||||
bar = self.current[key] = NativeOHLC(start, price, price, price, close)
|
||||
chunk.update(bar, backfill=backfill)
|
||||
self.dirty_chunks.add(chunk)
|
||||
else:
|
||||
@@ -434,6 +442,7 @@ class LightOHLCRepository (OHLCRepository):
|
||||
chunk = self.get_chunk(symbol, period, bar.start)
|
||||
chunk.update(bar, backfill=backfill)
|
||||
self.dirty_chunks.add(chunk)
|
||||
self.current[key] = updated[-1]
|
||||
|
||||
|
||||
def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]):
|
||||
|
||||
Reference in New Issue
Block a user