"""FastAPI server for BazaarBot environment.""" from __future__ import annotations import copy import json import os from contextlib import asynccontextmanager from typing import Optional from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel from .arena import MultiBuyerArena from .environment import BazaarEnvironment from .leaderboard import get_best_scores, get_leaderboard, record_score from .models import ( ActionType, ArenaAction, ArenaState, BazaarAction, BazaarObservation, BazaarReward, CounterfactualRequest, CounterfactualResult, DealOutcome, EnvironmentState, LeaderboardEntry, LeaderboardResponse, SellerPersonalityType, ) from .tasks import GRADERS, TASKS # ── Request / Response models ───────────────────────────────────── class ResetRequest(BaseModel): task: str = "single_deal" seed: Optional[int] = None seller_personality: Optional[str] = None # override task default class ResetResponse(BaseModel): observation: BazaarObservation done: bool = False reward: float = 0.0 class StepRequest(BaseModel): action: str # "offer", "accept", "walk" price: Optional[float] = None class StepResponse(BaseModel): observation: BazaarObservation reward: float done: bool info: dict = {} class ScoreResponse(BaseModel): task: str score: float episodes_completed: int total_episodes: int success: bool class RecordScoreRequest(BaseModel): agent_name: str metadata: dict = {} # Arena models class ArenaCreateRequest(BaseModel): task: str = "marketplace_arena" seed: Optional[int] = None num_buyers: int = 3 class ArenaJoinRequest(BaseModel): buyer_id: str name: str = "Buyer" is_human: bool = False class ArenaStepRequest(BaseModel): actions: dict[str, dict] # buyer_id -> {action, price, signal} # ── App state ───────────────────────────────────────────────────── def _client_ip(request: Request) -> Optional[str]: """Best-effort client IP for rate-limiting. Honors X-Forwarded-For when deployed behind a proxy/CDN; falls back to direct socket peer. Note: in untrusted environments XFF can be spoofed. Hosting plan today is direct uvicorn or behind a single-hop reverse proxy we control, so trusting the leftmost XFF entry is acceptable. """ xff = request.headers.get("x-forwarded-for") if xff: return xff.split(",")[0].strip() or None return request.client.host if request.client else None _envs: dict[str, BazaarEnvironment] = {} _arenas: dict[str, MultiBuyerArena] = {} _ws_connections: dict[str, list[WebSocket]] = {} @asynccontextmanager async def lifespan(app: FastAPI): yield _envs.clear() _arenas.clear() app = FastAPI( title="BazaarBot", description="OpenEnv negotiation environment with game-theory mechanics, seller personalities, tells, and multi-buyer arenas", version="2.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _get_env(session_id: str = "default") -> BazaarEnvironment: if session_id not in _envs: raise HTTPException(status_code=400, detail="No active session. Call /reset first.") return _envs[session_id] # ── WebSocket broadcasting ─────────────────────────────────────── async def _broadcast(session_id: str, event: str, data: dict): """Broadcast event to all WebSocket clients watching a session.""" conns = _ws_connections.get(session_id, []) dead = [] for ws in conns: try: await ws.send_json({"event": event, **data}) except Exception: dead.append(ws) for ws in dead: conns.remove(ws) # ── Endpoints ───────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def root(): task_rows = "".join( f'{name}{t.difficulty.capitalize()}' f'{t.seller_personality.value}{t.description}' for name, t in TASKS.items() ) return f""" BazaarBATNA — OpenEnv negotiation environment
OpenEnv · Negotiation Playground

Watch agents haggle.
Step in yourself.

A negotiation environment with observable tells and hidden reservation prices. Buyer and seller are both LLMs — Sauda on the buy side (Llama-3.1-8B + QLoRA, trained SFT → GRPO → DPO/RLAIF), Gemma-4-E4B on the sell side. Strategy improves through self-play. Drop in as a seller, watch the arena, or scrub a replay.

Powered by RLAIF OpenEnv-compliant 8B · QLoRA 8 tasks · 4 personas
Headline result

Sauda v2 beats the 8B base by 7.4% mean surplus

Same seller (Gemma-4-E4B), same seeds, same tasks. n=30 episodes per task. Sauda was trained on top of Llama-3.1-8B-Instruct with SFT + GRPO; the table below shows it outperforms the base model on every task it was trained against, and survives the seller-quality eval (5 of 6 acceptance criteria pass).

BuyerTellssingle_dealasymmetricamazonMeanDealsRounds
Llama-3.2-3B baseON0.7220.7310.2580.5701.002.2
Llama-3.1-8B baseON0.8180.7870.4300.6780.993.1
Sauda v2 (8B SFT+GRPO)OFF0.8350.8270.5210.7280.916.0
Sauda v2 (8B SFT+GRPO)ON0.8100.7680.5070.6950.886.0

Reading this: 3B → 8B base buys you +19% mean surplus. Training on 8B (SFT+GRPO) buys you another +7% AND ~2× longer negotiations — base models capitulate fast (2-3 rounds), Sauda actually plays the game. Sauda's deal rate (0.91) is a feature, not a bug — it walks when offers are bad. Tells channel ON underperforms tells OFF; reported as a kept negative result. Full transcripts: PayMyBills/scaling-eval-runs.

Training

SFT → GRPO → DPO/RLAIF

The buyer adapter is trained in three stages on top of Llama-3.1-8B-Instruct. SFT teaches strict-JSON Hinglish output. GRPO drives reward against the live env. DPO refines on Claude-judged preference pairs. Trainer state for the GRPO stage is on HF — anyone can curl it.

GRPO reward
0.97 peak

30 optimization steps, mean reward 0.94 across the run. Entropy fell 0.51 → 0.42 as the policy concentrated. Full log_history: trainer_state.json

Scaling-ladder win
+7.4% vs 8B base

Mean surplus across single_deal / asymmetric / amazon. Same seller, same seeds. Doubles the 3B base on the amazon task (0.258 → 0.521).

Seller quality
5 / 6 passing

Acceptance criteria for the Gemma-4-E4B seller: never accepts below reservation, never leaks reservation, monotonic counters, etc. Dataset: seller-quality-runs

The environment

8 tasks. 4 seller personas. 1 OpenEnv API.

From symmetric one-shot deals to multi-buyer marketplaces. Asymmetric information, hidden deadlines, deceptive sellers leaking poker-style tells, career history that follows the buyer across 10 deals. Every task graded with deterministic surplus + deal-rate reward.

{task_rows}
NameDifficultyPersonaWhat it tests
OpenEnv API

The endpoints judges run against

FastAPI server, Docker container, Hugging Face Space. POST /reset to start. POST /step to play. GET /score to grade. Real-time streams over WebSocket. Multi-buyer arenas. Counterfactual replays. Interactive Swagger →

POST/resetStart an episode
POST/stepSubmit buyer action
GET/stateFull env state
GET/scoreGraded score
GET/tasksList tasks
WS/ws/{{session}}Real-time stream
GET/leaderboardScore board
POST/leaderboard/recordRecord a score
POST/counterfactualWhat-if replay
POST/arena/createMulti-buyer arena
POST/arena/joinJoin arena
POST/arena/stepArena step
GET/arena/stateArena state
POST/highlightExtract seller tells
Artifacts on Hugging Face

Everything is durable. Anyone can reproduce.

Adapter

PayMyBills/bestdealbot-v2

Llama-3.1-8B + QLoRA, SFT+GRPO. trainer_state.json + last-checkpoint live for verification.

Open on HF →
Eval datasets

scaling-eval-runs

Full transcripts of the 3B / 8B / Sauda v2 scaling ladder. n=30 per task.

Open on HF →
Hackathon journal

The blog with all receipts

Bugs, the four-hour rollout we lost to a bash typo, the ablation that disproved our own hypothesis, written live.

Read on GitHub →
Training notebooks

One-click reproduce

Colab notebooks for SFT+GRPO and for DPO/RLAIF. T4-friendly, runnable end-to-end.

Open in Colab →
""" @app.get("/tasks") async def list_tasks(): return { name: { "difficulty": t.difficulty, "description": t.description, "seller_personality": t.seller_personality.value, "num_buyers": t.num_buyers, "enable_tells": t.enable_tells, "enable_coalition": t.enable_coalition, } for name, t in TASKS.items() } @app.post("/reset") async def reset(req: ResetRequest = ResetRequest()) -> ResetResponse: if req.task not in TASKS: raise HTTPException(status_code=400, detail=f"Unknown task: {req.task}. Available: {list(TASKS.keys())}") task = copy.deepcopy(TASKS[req.task]) # Override personality if specified if req.seller_personality: try: task.seller_personality = SellerPersonalityType(req.seller_personality) except ValueError: raise HTTPException( status_code=400, detail=f"Unknown personality: {req.seller_personality}. " f"Available: {[p.value for p in SellerPersonalityType]}", ) env = BazaarEnvironment(task, seed=req.seed) obs = env.reset() _envs["default"] = env await _broadcast("default", "reset", { "task": req.task, "observation": obs.model_dump(), }) return ResetResponse(observation=obs, done=False, reward=0.0) @app.post("/step") async def step(req: StepRequest) -> StepResponse: env = _get_env() try: action = BazaarAction(action=req.action, price=req.price) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid action: {e}") obs, reward_obj = env.step(action) # Broadcast via WebSocket await _broadcast("default", "step", { "round": env.current_round, "buyer_action": req.action, "buyer_price": req.price, "observation": obs.model_dump(), "reward": reward_obj.reward, "reward_components": reward_obj.components, "done": obs.done, }) # Check if all episodes are done (career mode) all_done = env.all_episodes_done if env.done and not all_done: next_obs = env.reset() await _broadcast("default", "episode_end", { "episode": env.current_episode - 1, "next_episode": env.current_episode, }) return StepResponse( observation=next_obs, reward=reward_obj.reward, done=False, info={ "episode_done": True, "episode": env.current_episode - 1, "reward_components": reward_obj.components, "next_episode": env.current_episode, }, ) return StepResponse( observation=obs, reward=reward_obj.reward, done=all_done if env.task.enable_career else obs.done, info={ "reward_components": reward_obj.components, "episode": env.current_episode, }, ) @app.get("/state") async def state() -> EnvironmentState: env = _get_env() return env.get_state() @app.get("/score") async def score() -> ScoreResponse: env = _get_env() task = env.task grader = GRADERS.get(task.name) if not grader: raise HTTPException(status_code=400, detail=f"No grader for task: {task.name}") final_score = grader(env.episode_results, task) return ScoreResponse( task=task.name, score=round(final_score, 4), episodes_completed=len(env.episode_results), total_episodes=task.total_episodes, success=final_score >= task.success_threshold, ) @app.get("/health") async def health(): return {"status": "ok", "version": "2.0.0"} # ── Highlight: span-level tell extraction for the /sell page ──── class HighlightRequest(BaseModel): message: str class HighlightSpan(BaseModel): start: int end: int text: str signal: str score: float explanation: str class HighlightResponse(BaseModel): spans: list[HighlightSpan] aggregate: dict[str, float] @app.post("/highlight", response_model=HighlightResponse) async def highlight(req: HighlightRequest): """Find tell-triggering phrases in a seller message and return char spans. Used by the /sell page to underline urgency/deception/condition phrases in the user's chat bubble after they send. Pattern-based, deterministic, no LLM call — instant. """ from nlp.keyword_patterns import find_matches, aggregate_signals matches = find_matches(req.message) return HighlightResponse( spans=[ HighlightSpan( start=m.start, end=m.end, text=m.text, signal=m.signal, score=m.score, explanation=m.explanation, ) for m in matches ], aggregate=aggregate_signals(matches), ) @app.get("/sauda/health") async def sauda_health(request: Request): """Probe both backends. Used to choose strategy and surface config errors. Public response is intentionally minimal: just a green/red signal. For the full ops view (spend, rate-limit hits, circuit-breaker state), pass the X-Sauda-Admin header matching SAUDA_ADMIN_TOKEN env-var. """ from .sauda_buyer import health as _full_health full = _full_health() admin_token = os.environ.get("SAUDA_ADMIN_TOKEN", "").strip() is_admin = bool(admin_token) and request.headers.get("x-sauda-admin", "") == admin_token if is_admin: return full # Public view: only the bits a UI needs to decide whether the live agent # is reachable. No spend numbers, no IP counts, no circuit breaker state. return { "status": "ok" if (full.get("hf_ok") or full.get("ollama_ok")) else "degraded", "live_agent_available": bool(full.get("hf_ok") or full.get("ollama_ok")), } @app.get("/sauda/backends") async def sauda_backends(): """Static metadata about available buyer backends, for the /sell UI dropdown.""" return { "backends": [ {"id": "sauda", "label": "Sauda v2 (HF Endpoint)", "primary": True, "description": "Llama-3.1-8B + SFT+GRPO LoRA, served via HF Inference Endpoint."}, {"id": "sauda_ollama", "label": "Sauda v2 (Ollama, local)", "primary": False, "description": "Same adapter, served locally via Ollama. Fallback when HF endpoint is unavailable."}, {"id": "smart", "label": "Rule-based (smart)", "primary": False, "description": "Heuristic baseline. No LLM. Always available."}, {"id": "naive", "label": "Rule-based (naive)", "primary": False, "description": "Easy buyer for seller-mode warmup."}, {"id": "aggressive", "label": "Rule-based (aggressive)", "primary": False, "description": "Hard rule-based buyer."}, ] } # ── Simulate (AI auto-play for spectator mode) ────────────────── class SimulateRequest(BaseModel): task: str = "single_deal" strategy: str = "smart" # "smart", "naive", "aggressive", "llm" seed: Optional[int] = None seller_personality: Optional[str] = None speed_ms: int = 0 # 0 = return all at once # LLM config (only used when strategy="llm") llm_provider: Optional[str] = None # "openai", "anthropic", "gemini", "huggingface", "grok" llm_api_key: Optional[str] = None llm_model: Optional[str] = None class SellerModeStepRequest(BaseModel): """User plays as seller: set your counteroffer price.""" price: float def _ai_buyer_action( obs: BazaarObservation, strategy: str, rng, *, client_ip: Optional[str] = None, ) -> BazaarAction: """Built-in AI buyer strategies for spectator / seller mode. `strategy` values: - "sauda" / "sauda_hf" → HF Inference Endpoint serving Sauda v2 - "sauda_ollama" → local ollama serving Sauda v2 - "smart" / "naive" / "aggressive" → rule-based heuristics (no LLM) `client_ip` is forwarded to the safety layer for per-IP rate-limiting on the metered HF backend; pass None for trusted server-internal callers. """ # Live Sauda v2 path (HF endpoint primary, Ollama fallback selectable) if strategy in ("sauda", "sauda_hf", "sauda_ollama"): from .sauda_buyer import sauda_action backend = "ollama" if strategy == "sauda_ollama" else "hf" obs_dict = obs.model_dump() if hasattr(obs, "model_dump") else obs.dict() result = sauda_action(obs_dict, backend=backend, client_ip=client_ip) action_str = result.get("action", "offer") price = result.get("price") msg = result.get("message", "") if action_str == "accept": ba = BazaarAction(action="accept") elif action_str == "walk": ba = BazaarAction(action="walk") else: ba = BazaarAction(action="offer", price=float(price) if price is not None else round((obs.own_private_budget or 100) * 0.3, 2)) # Smuggle the model's prose message + backend trace through a side channel # (BazaarAction has no message field; the route handler reads .sauda_message # off the action when present). try: object.__setattr__(ba, "sauda_message", msg) object.__setattr__(ba, "sauda_backend", result.get("backend", backend)) if result.get("error"): object.__setattr__(ba, "sauda_error", result["error"]) except Exception: pass return ba budget = obs.own_private_budget ask = obs.seller_asking_price opp = obs.opponent_last_offer or ask if strategy == "naive": if obs.current_round == 0: return BazaarAction(action="offer", price=round(ask * 0.8, 2)) if obs.current_round >= 2: return BazaarAction(action="accept") return BazaarAction(action="offer", price=round(ask * 0.85, 2)) elif strategy == "aggressive": target = budget * 0.35 if obs.current_round == 0: return BazaarAction(action="offer", price=round(target * 0.7, 2)) if opp <= target * 1.1: return BazaarAction(action="accept") if obs.rounds_remaining <= 1: return BazaarAction(action="walk") step_up = target * (0.7 + 0.05 * obs.current_round) return BazaarAction(action="offer", price=round(min(step_up, target), 2)) else: # smart if obs.current_round == 0: return BazaarAction(action="offer", price=round(ask * 0.4, 2)) seller_velocity = obs.seller_last_move_delta or 0 own_move = budget * 0.02 if seller_velocity > ask * 0.05 else budget * 0.05 last = obs.own_last_offer or (ask * 0.4) next_offer = last + own_move if obs.own_private_deadline and obs.current_round >= obs.own_private_deadline - 1: next_offer = min(opp * 0.95, budget * 0.7) if obs.current_round >= obs.own_private_deadline: return BazaarAction(action="accept") if opp <= budget * 0.55: return BazaarAction(action="accept") if obs.rounds_remaining <= 1 and opp > budget * 0.75: return BazaarAction(action="walk") if obs.rounds_remaining <= 1: return BazaarAction(action="accept") # Read tells if available if obs.tells and obs.tells.verbal_deception_cue > 0.4: next_offer *= 0.92 # hold firmer against bluffers if obs.career_history and obs.career_history.capitulation_rate > 0.3: next_offer *= 0.95 next_offer = max(next_offer, ask * 0.3) next_offer = min(next_offer, budget * 0.7) return BazaarAction(action="offer", price=round(next_offer, 2)) @app.get("/providers") async def list_providers(): """List available LLM providers and their models.""" from .llm import PROVIDERS return { name: { "name": p["name"], "models": p["models"], } for name, p in PROVIDERS.items() } @app.post("/simulate") async def simulate(req: SimulateRequest): """Run a full AI-vs-seller negotiation and return the complete history. Used for spectator mode — watch an AI agent negotiate in real-time. strategy="llm" uses an actual LLM via the specified provider. """ if req.task not in TASKS: raise HTTPException(status_code=400, detail=f"Unknown task: {req.task}") if req.strategy == "llm": if not req.llm_provider or not req.llm_api_key: raise HTTPException( status_code=400, detail="LLM strategy requires llm_provider and llm_api_key", ) task = copy.deepcopy(TASKS[req.task]) if req.seller_personality: task.seller_personality = SellerPersonalityType(req.seller_personality) env = BazaarEnvironment(task, seed=req.seed) _envs["spectator"] = env import random rng = random.Random(req.seed) steps = [] llm_history: list[str] = [] # conversation log for LLM context for ep in range(task.total_episodes): obs = env.reset() steps.append({ "round": 0, "episode": ep + 1, "actor": "seller", "action": "open", "price": obs.seller_asking_price, "message": obs.message, "reasoning": None, "reward": 0, "done": False, "tells": obs.tells.model_dump() if obs.tells else None, }) max_rounds = task.max_steps if task.total_episodes == 1 else task.max_steps // task.total_episodes for r in range(1, max_rounds + 1): if env.done: break reasoning = None if req.strategy == "llm": # Use actual LLM from .llm import call_llm obs_dict = obs.model_dump() llm_result = call_llm( provider=req.llm_provider, api_key=req.llm_api_key, model=req.llm_model, obs=obs_dict, history=llm_history, ) action_str = llm_result.get("action", "offer") price = llm_result.get("price") reasoning = llm_result.get("reasoning", "") action = BazaarAction(action=action_str, price=price) # Build history entry for next LLM call llm_history.append( f"Round {r}: You {'offered ' + str(price) if action_str == 'offer' else action_str}" f" -> Seller: {obs.message}" ) else: action = _ai_buyer_action(obs, req.strategy, rng) obs, reward_obj = env.step(action) steps.append({ "round": r, "episode": ep + 1, "actor": "buyer", "action": action.action.value if hasattr(action.action, 'value') else action.action, "price": action.price, "buyer_offer": action.price, "seller_offer": obs.opponent_last_offer, "message": obs.message, "reasoning": reasoning, "reward": reward_obj.reward, "reward_components": reward_obj.components, "done": obs.done, "outcome": obs.deal_outcome.value if obs.deal_outcome else None, "tells": obs.tells.model_dump() if obs.tells else None, }) if obs.done: break grader = GRADERS.get(task.name) final_score = grader(env.episode_results, task) if grader else 0.0 return { "steps": steps, "score": round(final_score, 4), "task": task.name, "strategy": req.strategy, "personality": task.seller_personality.value, "episodes": len(env.episode_results), "state": env.get_state().model_dump(), } # ── Seller mode (user plays as seller, AI is buyer) ───────────── class SellerModeResetRequest(BaseModel): task: str = "single_deal" strategy: str = "smart" seed: Optional[int] = None opening_price: float = 60.0 item_name: Optional[str] = None listing_price: Optional[float] = None # if user picked a real listing, this is its MRP @app.post("/seller-mode/reset") async def seller_mode_reset(req: SellerModeResetRequest, request: Request): """Start a seller-mode session. User plays as seller, AI plays as buyer.""" if req.task not in TASKS: raise HTTPException(status_code=400, detail=f"Unknown task: {req.task}") task = copy.deepcopy(TASKS[req.task]) # Tasks have hardcoded buyer_budget / seller_cost from synthetic examples. # When the user opens at a real-listing price ($2695 for an iPhone, $399 # for a sofa, etc) those numbers become nonsense and Sauda offers $30 on # a $2695 ask. Anchor the scale on the task's *opening price prior* — # buyer_budget = 1.67×ask in single_deal (60 → 100), and the relative # ratios (cost / budget ≈ 0.35, ask / budget ≈ 0.6) hold across tasks. # Derive sane budget/cost from the user's actual opening_price using those # ratios so the buyer's model of the deal scales with the listing. if req.opening_price and req.opening_price > 0: scaled_budget = float(req.opening_price) * 1.05 # buyer can stretch ~5% above ask scaled_cost = float(req.opening_price) * 0.35 # seller's true cost ~35% of ask else: scaled_budget = task.buyer_budget scaled_cost = task.seller_cost # Store seller mode state import random session = { "task": task, "strategy": req.strategy, "rng": random.Random(req.seed), "round": 0, "max_rounds": task.max_steps if task.total_episodes == 1 else task.max_steps // task.total_episodes, "buyer_budget": scaled_budget, "seller_cost": scaled_cost, "current_seller_price": req.opening_price, "last_buyer_offer": None, "history": [], "done": False, "outcome": None, } _envs["seller_mode"] = session # type: ignore # AI buyer sees the opening price obs = BazaarObservation( current_round=0, max_rounds=session["max_rounds"], opponent_last_offer=req.opening_price, own_private_budget=scaled_budget, rounds_remaining=session["max_rounds"], seller_asking_price=req.opening_price, item_name=req.item_name or "handwoven silk scarf", message=f"You open at {req.opening_price:.0f} rupees.", ) # AI buyer makes first offer client_ip = _client_ip(request) action = _ai_buyer_action(obs, req.strategy, session["rng"], client_ip=client_ip) session["round"] = 1 session["last_buyer_offer"] = action.price sauda_msg = getattr(action, "sauda_message", None) or "" sauda_backend = getattr(action, "sauda_backend", None) sauda_error = getattr(action, "sauda_error", None) session["history"].append({ "round": 0, "actor": "seller", "action": "open", "price": req.opening_price, }) session["history"].append({ "round": 1, "actor": "buyer", "action": action.action.value if hasattr(action.action, 'value') else action.action, "price": action.price, "message": sauda_msg, }) fallback_msg = ( f"Buyer offers {action.price:.0f} rupees." if action.action in ("offer", "OFFER", ActionType.OFFER) else f"Buyer {action.action}s." ) return { "round": 1, "buyer_action": action.action.value if hasattr(action.action, 'value') else action.action, "buyer_price": action.price, "message": sauda_msg or fallback_msg, "buyer_message": sauda_msg, "your_opening": req.opening_price, "history": session["history"], "done": False, } @app.post("/seller-mode/step") async def seller_mode_step(req: SellerModeStepRequest, request: Request): """User (as seller) sets counteroffer price. AI buyer responds.""" if "seller_mode" not in _envs: raise HTTPException(status_code=400, detail="No seller-mode session. Call /seller-mode/reset first.") session = _envs["seller_mode"] if session["done"]: return {"message": "Negotiation is over.", "done": True, "history": session["history"]} seller_price = req.price session["current_seller_price"] = seller_price session["round"] += 1 rnd = session["round"] session["history"].append({ "round": rnd, "actor": "seller", "action": "counter", "price": seller_price, }) # Check if seller accepted buyer's offer (seller price <= buyer's offer) if session["last_buyer_offer"] is not None and seller_price <= session["last_buyer_offer"]: session["done"] = True session["outcome"] = "deal" agreed = session["last_buyer_offer"] surplus = session["buyer_budget"] - agreed max_surplus = session["buyer_budget"] - session["seller_cost"] buyer_score = max(0, surplus / max_surplus) if max_surplus > 0 else 0 return { "round": rnd, "message": f"You accepted the buyer's offer of {agreed:.0f}! Deal closed.", "buyer_action": "deal", "buyer_price": agreed, "done": True, "outcome": "deal", "agreed_price": agreed, "buyer_score": round(buyer_score, 4), "seller_profit": agreed - session["seller_cost"], "history": session["history"], } # Build observation for AI buyer obs = BazaarObservation( current_round=rnd, max_rounds=session["max_rounds"], own_last_offer=session["last_buyer_offer"], opponent_last_offer=seller_price, own_private_budget=session["buyer_budget"], rounds_remaining=max(0, session["max_rounds"] - rnd), seller_asking_price=session["history"][0]["price"], item_name="handwoven silk scarf", message=f"Seller counters: {seller_price:.0f} rupees.", ) # Check expired if rnd >= session["max_rounds"]: session["done"] = True session["outcome"] = "expired" return { "round": rnd, "message": "Time's up! No deal reached.", "buyer_action": "expired", "buyer_price": None, "done": True, "outcome": "expired", "history": session["history"], } # AI buyer responds client_ip = _client_ip(request) action = _ai_buyer_action(obs, session["strategy"], session["rng"], client_ip=client_ip) if action.action in ("accept", ActionType.ACCEPT): session["done"] = True session["outcome"] = "deal" agreed = seller_price surplus = session["buyer_budget"] - agreed max_surplus = session["buyer_budget"] - session["seller_cost"] buyer_score = max(0, surplus / max_surplus) if max_surplus > 0 else 0 sauda_msg = getattr(action, "sauda_message", None) or "" sauda_backend = getattr(action, "sauda_backend", None) session["history"].append({ "round": rnd, "actor": "buyer", "action": "accept", "price": seller_price, "message": sauda_msg, }) return { "round": rnd, "message": sauda_msg or f"Buyer accepts your price of {seller_price:.0f}! Deal closed.", "buyer_message": sauda_msg, "buyer_action": "accept", "buyer_price": seller_price, "done": True, "outcome": "deal", "agreed_price": seller_price, "buyer_score": round(buyer_score, 4), "seller_profit": seller_price - session["seller_cost"], "history": session["history"], } elif action.action in ("walk", ActionType.WALK): session["done"] = True session["outcome"] = "walk" sauda_msg = getattr(action, "sauda_message", None) or "" sauda_backend = getattr(action, "sauda_backend", None) session["history"].append({ "round": rnd, "actor": "buyer", "action": "walk", "price": None, "message": sauda_msg, }) return { "round": rnd, "message": sauda_msg or "Buyer walks away! No deal.", "buyer_message": sauda_msg, "buyer_action": "walk", "buyer_price": None, "done": True, "outcome": "walk", "history": session["history"], } else: # offer session["last_buyer_offer"] = action.price sauda_msg = getattr(action, "sauda_message", None) or "" sauda_backend = getattr(action, "sauda_backend", None) sauda_error = getattr(action, "sauda_error", None) session["history"].append({ "round": rnd, "actor": "buyer", "action": "offer", "price": action.price, "message": sauda_msg, }) return { "round": rnd, "message": sauda_msg or f"Buyer counters with {action.price:.0f} rupees.", "buyer_message": sauda_msg, "buyer_action": "offer", "buyer_price": action.price, "done": False, "history": session["history"], } # ── WebSocket ──────────────────────────────────────────────────── @app.websocket("/ws/{session_id}") async def websocket_endpoint(websocket: WebSocket, session_id: str = "default"): await websocket.accept() if session_id not in _ws_connections: _ws_connections[session_id] = [] _ws_connections[session_id].append(websocket) try: # Send current state if session exists if session_id in _envs: env = _envs[session_id] await websocket.send_json({ "event": "connected", "state": env.get_state().model_dump(), }) else: await websocket.send_json({"event": "connected", "state": None}) # Keep alive and handle client messages while True: data = await websocket.receive_text() msg = json.loads(data) if msg.get("type") == "ping": await websocket.send_json({"event": "pong"}) elif msg.get("type") == "step": # Allow stepping via WebSocket too env = _get_env(session_id) action = BazaarAction(action=msg["action"], price=msg.get("price")) obs, reward_obj = env.step(action) response = { "event": "step", "round": env.current_round, "observation": obs.model_dump(), "reward": reward_obj.reward, "reward_components": reward_obj.components, "done": obs.done, } # Broadcast to all watchers await _broadcast(session_id, "step", response) except WebSocketDisconnect: _ws_connections[session_id].remove(websocket) # ── Leaderboard ────────────────────────────────────────────────── @app.get("/leaderboard") async def leaderboard(task: Optional[str] = None, limit: int = 50, offset: int = 0) -> LeaderboardResponse: return get_leaderboard(task=task, limit=limit, offset=offset) @app.get("/leaderboard/best") async def leaderboard_best(): return get_best_scores() @app.post("/leaderboard/record") async def leaderboard_record(req: RecordScoreRequest) -> LeaderboardEntry: env = _get_env() task = env.task grader = GRADERS.get(task.name) if not grader: raise HTTPException(status_code=400, detail=f"No grader for task: {task.name}") final_score = grader(env.episode_results, task) return record_score( agent_name=req.agent_name, task=task.name, score=round(final_score, 4), episodes_completed=len(env.episode_results), metadata=req.metadata, ) # ── Counterfactual analysis ────────────────────────────────────── @app.post("/counterfactual") async def counterfactual(req: CounterfactualRequest) -> CounterfactualResult: """Replay from a decision point with a different action. Uses environment snapshots to fork the negotiation at any round and explore 'what if I had offered X instead?' """ env = _get_env(req.session_id) # Save original results original_results = list(env.episode_results) original_outcome = original_results[-1].outcome if original_results else None original_price = original_results[-1].agreed_price if original_results else None original_grader = GRADERS.get(env.task.name) original_score = original_grader(original_results, env.task) if original_grader else 0.0 # Create a copy of the environment and restore to the fork point cf_env = copy.deepcopy(env) if not cf_env.restore_snapshot(req.from_round): raise HTTPException( status_code=400, detail=f"No snapshot at round {req.from_round}. Available: {list(env._snapshots.keys())}", ) # Execute the alternative action alt_action = BazaarAction(action=req.alternative_action, price=req.alternative_price) cf_history = [] obs, reward = cf_env.step(alt_action) cf_history.append({ "round": cf_env.current_round, "action": req.alternative_action.value, "price": req.alternative_price, "seller_response": obs.message, "reward": reward.reward, "done": obs.done, }) # Continue with a simple greedy strategy for remaining rounds while not cf_env.done and cf_env.current_round < cf_env.seller.max_rounds: if obs.opponent_last_offer and obs.opponent_last_offer <= cf_env.buyer_budget * 0.6: action = BazaarAction(action="accept") else: offer_price = (obs.opponent_last_offer or cf_env.seller.anchor) * 0.85 offer_price = min(offer_price, cf_env.buyer_budget * 0.7) action = BazaarAction(action="offer", price=round(offer_price, 2)) obs, reward = cf_env.step(action) cf_history.append({ "round": cf_env.current_round, "action": action.action.value, "price": action.price, "seller_response": obs.message, "reward": reward.reward, "done": obs.done, }) cf_results = cf_env.episode_results cf_outcome = cf_results[-1].outcome if cf_results else None cf_price = cf_results[-1].agreed_price if cf_results else None cf_score = original_grader(cf_results, cf_env.task) if original_grader else 0.0 return CounterfactualResult( original_outcome=original_outcome, original_price=original_price, original_score=round(original_score, 4), counterfactual_outcome=cf_outcome, counterfactual_price=cf_price, counterfactual_score=round(cf_score, 4), divergence_round=req.from_round, counterfactual_history=cf_history, ) # ── Multi-buyer Arena ─────────────────────────────────────────── @app.post("/arena/create") async def arena_create(req: ArenaCreateRequest): if req.task not in TASKS: raise HTTPException(status_code=400, detail=f"Unknown task: {req.task}") task = copy.deepcopy(TASKS[req.task]) task.num_buyers = req.num_buyers arena = MultiBuyerArena(task, seed=req.seed) arena_id = arena.arena_id _arenas[arena_id] = arena return {"arena_id": arena_id, "num_buyers": req.num_buyers, "task": req.task} @app.post("/arena/{arena_id}/join") async def arena_join(arena_id: str, req: ArenaJoinRequest): if arena_id not in _arenas: raise HTTPException(status_code=404, detail="Arena not found") arena = _arenas[arena_id] try: buyer = arena.add_buyer(req.buyer_id, req.name, req.is_human) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) return {"buyer": buyer.model_dump(), "arena_id": arena_id, "total_buyers": len(arena.buyers)} @app.post("/arena/{arena_id}/reset") async def arena_reset(arena_id: str): if arena_id not in _arenas: raise HTTPException(status_code=404, detail="Arena not found") arena = _arenas[arena_id] if len(arena.buyers) < 2: raise HTTPException(status_code=400, detail="Need at least 2 buyers to start") observations = arena.reset() return {bid: obs.model_dump() for bid, obs in observations.items()} @app.post("/arena/{arena_id}/step") async def arena_step(arena_id: str, req: ArenaStepRequest): if arena_id not in _arenas: raise HTTPException(status_code=404, detail="Arena not found") arena = _arenas[arena_id] actions = {} for bid, act_dict in req.actions.items(): actions[bid] = ArenaAction( buyer_id=bid, action=act_dict.get("action", "offer"), price=act_dict.get("price"), signal=act_dict.get("signal"), ) observations = arena.step(actions) await _broadcast(f"arena_{arena_id}", "arena_step", { "round": arena.current_round, "done": arena.done, "winner": arena.winner, }) return {bid: obs.model_dump() for bid, obs in observations.items()} @app.get("/arena/{arena_id}/state") async def arena_state(arena_id: str) -> ArenaState: if arena_id not in _arenas: raise HTTPException(status_code=404, detail="Arena not found") return _arenas[arena_id].get_state()