import logging import math import matplotlib.pyplot as plt import pandas as pd import numpy as np log = logging.getLogger(__name__) def lmsr_swap_amount_out( balances, amount_in, token_in_index, token_out_index, lp_fee, kappa, ): """ Compute the LMSR kernel fee-free amountOut for swapping `amount_in` of token `token_in_index` into token `token_out_index`, applying lp_fee to the input amount (i.e., amount_in_net = amount_in * (1 - lp_fee)). Uses native Python floats for performance and simplicity. Notes on congruence with PartyPool / LMSRStabilized: - The kernel formula implemented here matches the LMSR closed-form: amountOut = b * ln(1 + r0 * (1 - exp(-a / b))) where r0 = exp((q_i - q_j) / b) and b = kappa * S (S = sum balances). - lp_fee is applied to the input before the kernel (fee-on-input), matching PartyPool's placement. - This function uses continuous float arithmetic and does NOT emulate PartyPool's integer/unit conversions or rounding (floor/ceil) and ppm fee quantization. Parameters: - balances: iterable of per-token balances (numbers). These represent q_i in internal units. - amount_in: input amount supplied by swapper (before fees). - token_in_index: index of the input token i. - token_out_index: index of the output token j. - lp_fee: fractional LP fee applied to the input (e.g., 0.003). - kappa: liquidity parameter κ (must be positive, in same units as balances). Returns: - float: net amountOut (exclusive of fees) the swapper receives from the pool (capped at pool balance). """ # Normalize and validate inputs (convert to floats) try: q = [float(x) for x in balances] a_in = float(amount_in) lpf = float(lp_fee) k = float(kappa) except (TypeError, ValueError) as e: raise ValueError("Invalid numeric input") from e n = len(q) if not (0 <= token_in_index < n and 0 <= token_out_index < n): raise IndexError("token indices out of range") if a_in <= 0.0: return 0.0 if k <= 0.0: raise ValueError("kappa must be positive") # Size metric S = sum q_i S = sum(q) if S <= 0.0: raise ValueError("size metric (sum balances) must be positive") # b = kappa * S b = k * S if b <= 0.0: raise ValueError("computed b must be positive") # Apply LP fee on the input amount (kernel is fee-free; wrapper handles fees) if not (0.0 <= lpf < 1.0): raise ValueError("lp_fee must be in [0, 1)") a_net = a_in * (1.0 - lpf) if a_net <= 0.0: return 0.0 qi = q[token_in_index] qj = q[token_out_index] # Guard: output asset must have non-zero effective reserve if qj <= 0.0: # No available output to withdraw return 0.0 # Compute r0 = exp((q_i - q_j) / b) try: r0 = math.exp((qi - qj) / b) except OverflowError: raise ArithmeticError("exponential overflow in r0 computation") # Compute a/b a_over_b = a_net / b # exp(-a/b) try: exp_neg = math.exp(-a_over_b) except OverflowError: # If exp would underflow/overflow, treat exp(-a/b) as 0 in extreme case exp_neg = 0.0 # inner = 1 + r0 * (1 - exp(-a/b)) inner = 1.0 + r0 * (1.0 - exp_neg) # If inner <= 0, cap to available balance qj if inner <= 0.0: return float(qj) # amountOut = b * ln(inner) try: ln_inner = math.log(inner) except (ValueError, OverflowError): # Numeric issue computing ln; be conservative and return 0 return 0.0 amount_out = b * ln_inner # Safety: non-positive output -> return zero if amount_out <= 0.0: return 0.0 # Cap output to pool's current balance qj (cannot withdraw more than available) if amount_out > qj: return float(qj) return float(amount_out) def main(): balance0 = 1_000_000 # estimated from the production pool balances = [balance0, balance0] X = np.geomspace(1, 10_000_000, 100) Y = [1 - lmsr_swap_amount_out(balances, float(amount_in), 0, 1, 0.000010, 0.8) / amount_in for amount_in in X] plt.plot(X / balance0, Y, label='LMSR') d = pd.read_csv('swap_results_block_23640998.csv') d.columns = ['block', 'price0', 'price1', 'in0', 'out0', 'in1', 'out1'] uniswap_slippage = 1 - d.out0 / d.in0 / d.iloc[0].price0 plt.plot(d.in0 / balance0, uniswap_slippage, label='CP') # Interpolate Uniswap slippage to match LMSR x-coordinates interp_uniswap = np.interp(X / balance0, d.in0 / balance0, uniswap_slippage) mask = Y < interp_uniswap plt.fill_between(X / balance0, 0, 1, where=mask, alpha=0.2, color='green') plt.xscale('log') plt.yscale('log') plt.gca().xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: '{:.2f}'.format(10000*x))) plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: '{:.2f}%'.format(y))) plt.xlabel('Input Amount (basis points of initial balance)') plt.ylabel('Slippage') plt.title('Pool Slippages') plt.grid(True) plt.legend() plt.show() if __name__ == '__main__': main()