dxod repo init

This commit is contained in:
tim
2025-09-15 14:21:56 -04:00
commit 5fb2b17b2e
18 changed files with 4996 additions and 0 deletions

View File

@@ -0,0 +1,354 @@
# lmsr_vs_cp_sim.py
# Requires: Python 3.9+, numpy, matplotlib
# Optional: seaborn (for prettier plots)
import math
import numpy as np
import matplotlib.pyplot as plt
try:
import seaborn as sns
sns.set_context("talk")
sns.set_style("whitegrid")
except Exception:
pass
# ---------------------------
# Core AMM primitives
# ---------------------------
def cp_price(X, Y):
# Instantaneous marginal price p = dy/dx for constant product
return Y / X
def cp_trade_y_out(X, Y, qx_in, fee=0.0):
"""
Swap x-in (amount qx_in) for y-out on a constant-product pool with fee.
Fee taken on input (Uniswap-style): only (1-fee)*qx_in enters the invariant.
Returns (dy_out, X_new, Y_new, fee_value).
"""
assert X > 0 and Y > 0 and qx_in >= 0 and 0 <= fee < 1
k = X * Y
effective_in = qx_in * (1 - fee)
X_new = X + effective_in
Y_new = k / X_new
dy_out = Y - Y_new
fee_value = qx_in * fee # in x-units
return dy_out, X_new, Y_new, fee_value
def cp_avg_y_per_x(X, Y, qx_in, fee=0.0):
if qx_in == 0:
return cp_price(X, Y)
dy, *_ = cp_trade_y_out(X, Y, qx_in, fee)
return dy / qx_in
def lmsr_calibrate_b(X, Y, mode="thin_side", factor=0.5):
"""
Calibrate LMSR 'b' to match CP local log-price stiffness at the current state.
- thin_side: use min(X, Y) * factor where factor≈0.5 matches ds/dq at that side
- x_side: use X * factor
- y_side: use Y * factor (useful if measuring q in y-space)
For our use (q measured in x), 'x_side' or 'thin_side' with factor=0.5 are typical.
"""
if mode == "thin_side":
return max(1e-12, factor * min(X, Y))
elif mode == "x_side":
return max(1e-12, factor * X)
elif mode == "y_side":
return max(1e-12, factor * Y)
else:
raise ValueError("mode must be one of ['thin_side','x_side','y_side']")
def lmsr_y_out(qx_in, p0, b, fee=0.0):
"""
Symmetric 2-asset LMSR approximation in x-measure:
- We model log-price s moving linearly in q_x: ds/dq_x = 1/b
- Then price path p(q) = p0 * exp(q/b)
- Cumulative y received for x-in q is integral of p(q) dq: y(q) = p0 * b * (exp(q/b) - 1)
Fee applied to input reduces effective q: q_eff = q * (1 - fee)
Returns dy_out (in y-units) and fee_value (in x-units)
"""
assert qx_in >= 0 and b > 0 and p0 > 0 and 0 <= fee < 1
q_eff = qx_in * (1 - fee)
dy = p0 * b * (math.exp(q_eff / b) - 1.0)
fee_value = qx_in * fee
return dy, fee_value
def lmsr_avg_y_per_x(qx_in, p0, b, fee=0.0):
if qx_in == 0:
return p0
dy, _ = lmsr_y_out(qx_in, p0, b, fee)
return dy / qx_in
# ---------------------------
# Static comparison utilities
# ---------------------------
def static_compare(X, Y, fee_cp=0.0005, fee_lmsr=0.0005, b=None, b_mode="x_side", b_factor=0.5,
q_grid=None):
"""
Compute average execution (y per x), penalties vs spot, and welfare gaps over a grid of trade sizes.
Returns dict with arrays.
"""
p0 = cp_price(X, Y)
if b is None:
b = lmsr_calibrate_b(X, Y, mode=b_mode, factor=b_factor)
if q_grid is None:
# span from tiny to a meaningful fraction of X
q_grid = np.geomspace(1e-9 * X, 0.3 * X, 60)
avg_cp = np.array([cp_avg_y_per_x(X, Y, q, fee_cp) for q in q_grid])
avg_lm = np.array([lmsr_avg_y_per_x(q, p0, b, fee_lmsr) for q in q_grid])
# Slippage penalty vs spot p0 (in y-per-x)
pen_cp = p0 - avg_cp
pen_lm = p0 - avg_lm
# Taker welfare difference: LMSR minus CP (positive means LMSR better for taker)
welfare_gap = avg_lm - avg_cp
return {
"q_grid": q_grid,
"avg_cp": avg_cp,
"avg_lm": avg_lm,
"pen_cp": pen_cp,
"pen_lm": pen_lm,
"welfare_gap": welfare_gap,
"p0": p0,
"b": b
}
# ---------------------------
# Imbalance sweep
# ---------------------------
def sweep_imbalance(X, rho_grid, fee_cp=0.0005, fee_lmsr=0.0005, q_frac=0.01,
b_mode="thin_side", b_factor=0.5):
"""
For a fixed X (x-reserve), sweep imbalance rho = Y/X and measure welfare advantage at a chosen trade size q = q_frac * X.
Returns arrays over rho.
"""
q = q_frac * X
wgap = []
pen_cp_list, pen_lm_list, p0_list, b_list = [], [], [], []
for rho in rho_grid:
Y = rho * X
res = static_compare(X, Y, fee_cp, fee_lmsr, b=None, b_mode=b_mode, b_factor=b_factor, q_grid=np.array([q]))
wgap.append(res["welfare_gap"][0])
pen_cp_list.append(res["pen_cp"][0])
pen_lm_list.append(res["pen_lm"][0])
p0_list.append(res["p0"])
b_list.append(res["b"])
return {
"rho_grid": rho_grid,
"q": q,
"welfare_gap": np.array(wgap),
"pen_cp": np.array(pen_cp_list),
"pen_lm": np.array(pen_lm_list),
"p0": np.array(p0_list),
"b": np.array(b_list)
}
# ---------------------------
# Sequential Monte Carlo simulation
# ---------------------------
def sample_trade_sizes(n, X, dist="lognormal", mean_frac=0.005, std_frac=0.01, seed=None):
"""
Return non-negative trade sizes q in x-units.
- lognormal: parameters chosen so mean ≈ mean_frac*X
- uniform: U(0, 2*mean_frac*X)
"""
rng = np.random.default_rng(seed)
if dist == "lognormal":
mean = mean_frac * X
std = std_frac * X
# Map mean/std to lognormal parameters
# mean = exp(mu + sigma^2/2), var = (exp(sigma^2)-1)exp(2mu+sigma^2)
# Let CV = std/mean => sigma^2 = ln(1+CV^2), mu = ln(mean) - sigma^2/2
cv = std / max(1e-12, mean)
sigma2 = math.log(1 + cv * cv)
sigma = math.sqrt(sigma2)
mu = math.log(max(1e-12, mean)) - sigma2 / 2
q = rng.lognormal(mean=mu, sigma=sigma, size=n)
return q
elif dist == "uniform":
return rng.uniform(0, 2 * mean_frac * X, size=n)
else:
raise ValueError("Unknown dist")
def sequential_sim(X0, Y0, n_trades=2000, direction_bias=0.6,
fee_cp=0.0005, fee_lmsr=0.0005,
b_mode="thin_side", b_factor=0.5,
dist="lognormal", mean_frac=0.005, std_frac=0.01, seed=42):
"""
Run a sequential simulation where each trade either buys y with x (prob=direction_bias)
or buys x with y (prob=1-direction_bias). We compare CP vs LMSR per-trade and accumulate:
- taker surplus difference (y-per-x times q, converted to y-units)
- fee revenue for LPs (x- or y-denominated; we also track value at spot)
- pool state evolution (for CP only; LMSR is state-less under this approximation)
Note: We approximate LMSR as state-less with ds/dq = 1/b, b recalibrated each step to the thin side.
"""
rng = np.random.default_rng(seed)
X_cp, Y_cp = X0, Y0
p_history = []
b_history = []
taker_y_advantage = 0.0
lp_fee_x_cp = 0.0
lp_fee_x_lm = 0.0
# Track realized slippage stats at fixed snapshot prices for comparability
for t in range(n_trades):
p_cp = cp_price(X_cp, Y_cp)
p_history.append(p_cp)
# Calibrate LMSR b to thin side each step
b = lmsr_calibrate_b(X_cp, Y_cp, mode=b_mode, factor=b_factor)
b_history.append(b)
# Decide direction: True => use x to buy y; False => use y to buy x
buy_y = rng.random() < direction_bias
if buy_y:
# Draw trade size in x-units
qx = float(sample_trade_sizes(1, X_cp, dist, mean_frac, std_frac, seed=rng.integers(1e9))[0])
# CP execution
dy_cp, X_cp_new, Y_cp_new, fee_x_cp = cp_trade_y_out(X_cp, Y_cp, qx, fee_cp)
# LMSR execution (approximate with current spot p_cp)
dy_lm, fee_x_lm = lmsr_y_out(qx, p_cp, b, fee_lmsr)
# Update CP state
X_cp, Y_cp = X_cp_new, Y_cp_new
# Accumulate
taker_y_advantage += (dy_lm - dy_cp)
lp_fee_x_cp += fee_x_cp
lp_fee_x_lm += fee_x_lm
else:
# Buy x with y. Mirror the model by symmetry:
# We'll convert problem by swapping roles of (x,y) and using same formulas.
# Draw trade size in y-units proportional to Y
qy = float(sample_trade_sizes(1, Y_cp, dist, mean_frac, std_frac, seed=rng.integers(1e9))[0])
# CP: y-in, x-out
# Symmetric function by swapping X<->Y and interpreting result
dy_dummy, Y_new, X_new, fee_y_cp = cp_trade_y_out(Y_cp, X_cp, qy, fee_cp) # returns x-out in "dy_dummy"
dx_cp = dy_dummy # interpret as x-out
X_cp, Y_cp = X_new, Y_new
# LMSR: state-less approx using ds/dq_y = 1/b_y; we map via price and symmetry.
# For buy-x with y-in, average x per y uses 1/p along exponential path in y-space.
# For simplicity, we mirror by computing y-per-x with LMSR at price p, then invert locally:
# Expected x received ≈ qy / p * (e^{(qy_eff/b_y)/?} - 1)/((qy)/?) ... too detailed.
# Instead, use symmetry by swapping labels and reusing the function with p_inv = 1/p.
p_inv = 1.0 / p_cp
b_y = lmsr_calibrate_b(X_cp, Y_cp, mode="y_side", factor=b_factor)
dx_lm, fee_y_lm = lmsr_y_out(qy, p_inv, b_y, fee_lmsr) # gives x-out
# Taker advantage in x-units; convert to y-units for aggregation using pre-trade spot
taker_y_advantage += (dx_lm - dx_cp) * p_cp # multiply by p to convert x to y at current spot
lp_fee_x_cp += fee_y_cp * p_inv # convert y-fee to x using p_inv
lp_fee_x_lm += fee_y_lm * p_inv
return {
"taker_y_advantage": taker_y_advantage,
"lp_fee_x_cp": lp_fee_x_cp,
"lp_fee_x_lm": lp_fee_x_lm,
"p_history": np.array(p_history),
"b_history": np.array(b_history),
"final_state_cp": (X_cp, Y_cp)
}
# ---------------------------
# Plotting helpers
# ---------------------------
def plot_static(res):
q = res["q_grid"]
p0 = res["p0"]
plt.figure(figsize=(10, 6))
plt.plot(q, res["avg_cp"], label="CP avg y-per-x")
plt.plot(q, res["avg_lm"], label="LMSR avg y-per-x")
plt.axhline(p0, color="gray", linestyle="--", alpha=0.6, label="Spot p0")
plt.xscale("log")
plt.title("Average execution (y per x) vs trade size")
plt.xlabel("q_x (trade size)")
plt.ylabel("y per x")
plt.legend()
plt.tight_layout()
plt.figure(figsize=(10, 6))
plt.plot(q, res["welfare_gap"] / res["p0"] * 1e4)
plt.xscale("log")
plt.axhline(0, color="gray", linestyle="--")
plt.title("Taker welfare advantage LMSRCP (in bp of price)")
plt.xlabel("q_x (trade size)")
plt.ylabel("bp")
plt.tight_layout()
def plot_imbalance(sweep):
rho = sweep["rho_grid"]
plt.figure(figsize=(10, 6))
plt.plot(rho, sweep["welfare_gap"] / sweep["p0"] * 1e4)
plt.axhline(0, color="gray", linestyle="--")
plt.title(f"LMSRCP taker advantage vs imbalance (q={sweep['q']:.4g}·X)")
plt.xlabel("rho = Y/X (imbalance)")
plt.ylabel("bp of price")
plt.tight_layout()
# ---------------------------
# Demo main
# ---------------------------
def demo():
# Baseline pool
X, Y = 10_000.0, 10_000.0
fee_cp = 0.0005 # 5 bp
fee_lmsr = 0.0005 # 5 bp
b_mode = "thin_side"
b_factor = 0.5
# Static comparison across trade sizes
res = static_compare(
X, Y, fee_cp=fee_cp, fee_lmsr=fee_lmsr,
b=None, b_mode=b_mode, b_factor=b_factor
)
print(f"Spot p0={res['p0']:.6f}, calibrated b={res['b']:.6f}")
plot_static(res)
# Imbalance sweep
rho_grid = np.linspace(0.2, 5.0, 60) # from thin-y to thin-x
sw = sweep_imbalance(
X, rho_grid, fee_cp=fee_cp, fee_lmsr=fee_lmsr,
q_frac=0.01, b_mode=b_mode, b_factor=b_factor
)
plot_imbalance(sw)
# Sequential simulation
sim = sequential_sim(
X0=X, Y0=Y, n_trades=2000, direction_bias=0.6,
fee_cp=fee_cp, fee_lmsr=fee_lmsr,
b_mode=b_mode, b_factor=b_factor,
dist="lognormal", mean_frac=0.005, std_frac=0.01, seed=7
)
adv_bp_equiv = sim["taker_y_advantage"] / (X + Y / res["p0"]) * 1e4
print(f"Sequential sim taker Y-advantage (absolute): {sim['taker_y_advantage']:.6f} y-units")
print(f"LP fee (CP) in x-units: {sim['lp_fee_x_cp']:.6f}")
print(f"LP fee (LMSR) in x-units: {sim['lp_fee_x_lm']:.6f}")
plt.show()
if __name__ == "__main__":
demo()