From a1ba66787a97ddb6555c8cc6ee1c30bcd70f9c3f Mon Sep 17 00:00:00 2001 From: tim Date: Mon, 5 Aug 2024 00:25:05 -0400 Subject: [PATCH] finalohlc bugfixes --- src/dexorder/final_ohlc.py | 48 +++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/src/dexorder/final_ohlc.py b/src/dexorder/final_ohlc.py index 2a3c247..469de4e 100644 --- a/src/dexorder/final_ohlc.py +++ b/src/dexorder/final_ohlc.py @@ -73,12 +73,15 @@ class OHLCFilePath: class OHLCFile: + @staticmethod def get(base_dir: str, filepath: OHLCFilePath): key = base_dir, filepath - if key not in OHLCFile.cache: - OHLCFile.cache[key] = OHLCFile(base_dir, filepath) - return OHLCFile.cache[key] + if key in OHLCFile._cache: + return OHLCFile._cache[key] + item = OHLCFile._closing.pop(key, False) or OHLCFile(base_dir, filepath) + OHLCFile._cache[key] = item + return item def __init__(self, base_dir: str, filepath: OHLCFilePath): self.base_dir = base_dir @@ -93,7 +96,11 @@ class OHLCFile: key, item = super().popitem() item.close() - cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size) + _cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size) + + # We must wait to close files until the end of a flush, in case they have pending data. These items are no longer + # in the cache, but they are still active and not yet closed. + _closing:dict[tuple[str,OHLCFilePath],:'OHLCFile'] = {} @property def filename(self): @@ -175,9 +182,9 @@ class OHLCFile: else: # load existing file line_number = 0 - row = None + row = None # this will be the last row of the file that is not earlier than the earliest change prev_line = None - offset = 0 # this will point to the start of the last row + offset = self.file.tell() # this will end up pointing to the start of the last row for line in self.file.readlines(): # all files should be small and can load at once line_number += 1 row = line.decode('ascii').strip().split(',') @@ -186,19 +193,28 @@ class OHLCFile: if prev_line is not None: # advance cursor past the previous row offset += len(prev_line) - if int(row[0]) >= earliest_change: - # we are going to replace this row so stop walking forward and truncate the rest of the file - self.file.truncate(offset) - break + try: + if int(row[0]) >= earliest_change: + # we are going to replace this row so stop walking forward and truncate the rest of the file + self.file.truncate(offset) + break + except: + log.error(f'Couldn\'t read line {line_number} in {self.filename} row {row}') + raise prev_line = line if row is not None: - self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices + try: + self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices + except: + log.error(f'problem in {self.filename}:{line_number} row {row}') + raise # set the file's write pointer the start of the final row self.file.seek(offset) # self._cur is now either None (empty file) or points to a natively-typed list representing the last row + def close(self): - self.file.close() + OHLCFile._closing[self.base_dir, self.filepath] = self class OHLCFileSeries: @@ -232,7 +248,6 @@ class OHLCFileSeries: # load quote file # if self.quote_file is None: - quote_filename = self.quote_filename try: self.quote_file = open(quote_filename, 'br+') @@ -337,4 +352,9 @@ class FinalOHLCRepository: for series in self.dirty_series: series.flush() self.dirty_series.clear() - + # We must wait to close files until the end of a flush, in case they have pending data. + # noinspection PyProtectedMember + for closing in OHLCFile._closing.values(): + closing.file.close() + # noinspection PyProtectedMember + OHLCFile._closing.clear()