import logging import sys from dexorder import DELETE, NARG from dexorder.base.chain import current_chain, Mockchain from dexorder.blockstate import BlockState, BlockDict, current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.fork import current_fork, Fork logging.basicConfig(level=logging.INFO, stream=sys.stdout) logging.getLogger('dexorder').setLevel(logging.DEBUG) current_chain.set(Mockchain) b0 = bytes([0]) # genesis block hash root_branch = Branch(0, 0, bytes(), [b0]) def new_state(): state = BlockState() state.add_branch(root_branch) current_blockstate.set(state) return state s = new_state() series_name = 'test' series = BlockDict(series_name) def get(fork: Fork, default=NARG): value = s.get(fork, series_name, 'foo', default) # print(f'{fork} => {value}') return value block_data = {} def make_block(num: int, data: dict=None): key = bytes([num]) block_data[key] = data if data is not None else dict(foo=hex(num)[2:]) return key # blocks are by height and then an a-b-c fork # by default, each block sets foo= b1a = make_block(0x1a) b2a = make_block(0x2a) b3a = make_block(0x3a) b4a = make_block(0x4a) b5a = make_block(0x5a) def make_branch(state: BlockState, height: int, start: int, parent: bytes, path: list[bytes]): branch = Branch(height, start, parent, path) fork = state.add_branch(branch) current_fork.set(fork) for block_id in reversed(branch.path): for k,v in block_data[block_id].items(): series[k] = v return fork fork_a = make_branch(s, 5, 1, b0, [b5a, b4a, b3a, b2a, b1a]) fork_a1 = make_branch(s, 1, 1, b0, [b1a]) fork_a2 = make_branch(s, 2, 2, b1a, [b2a]) fork_a3 = make_branch(s, 3, 3, b2a, [b3a]) fork_aa = make_branch(s, 3, 1, b0, [b3a, b2a, b1a]) fork_ab = make_branch(s, 5, 4, b3a, [b5a, b4a]) # this fork has multiple branch combinations. the algo should prefer using fewer branches. assert fork_ab.branches[1] == fork_aa.branch assert get(fork_a) == '5a' assert get(fork_aa) == '3a' assert get(fork_ab) == '5a' # now change the current value at the end of fork_a current_fork.set(fork_a) diff_count = len(s.diffs_by_branch[fork_a.branch_id]) series['foo'] = 'not' assert get(fork_a) == 'not' series['foo'] = 'bar' assert get(fork_a) == 'bar' # make sure it didn't create any extra diffs but performed value replacement in the DiffEntry instead assert diff_count == len(s.diffs_by_branch[fork_a.branch_id]) # chain B does nothing until deleting foo at height 3, then it sets it back at height 5 # block 1 is taken from a-chain b2b = make_block(0x2b, {}) b3b = make_block(0x3b, dict(foo=DELETE)) b4b = make_block(0x4b, {}) b5b = make_block(0x5b) fork_from_a = make_branch(s, 2, 2, b1a, [b2b]) # this fork should have joined the branch from fork_a1, which connects to genesis for a total of three branches assert len(fork_from_a.branches) == 3 assert fork_from_a.branches[1] == fork_a1.branch # the value should have carried over from the other branch assert get(fork_from_a) == '1a' fork_delete = make_branch(s, 4, 3, b2b, [b4b, b3b]) missing = 'missing' assert get(fork_delete, missing) is missing # make sure it throws KeyError since the key is deleted try: found = series['foo'] assert False except KeyError: pass # restore the 'foo' key with a value of '5b' fork_restore = make_branch(s, 5, 5, b4b, [b5b]) assert get(fork_restore) == '5b' s.promote_root(fork_aa) # test garbage collection diffs = s.diffs_by_series[series_name].get('foo') assert diffs assert diffs[-1].height == 3 # only the very latest value should be maintained try: s.promote_root(fork_from_a) assert False # fork B should not be able to be promoted except AssertionError: pass # chain C b1c = make_block(0x1c) b2c = make_block(0x2c) b3c = make_block(0x3c) b4c = make_block(0x4c) b5c = make_block(0x5c) logging.getLogger('dexorder').error('Insufficient number of test cases')