From de4862b2d1d5f0b0221635121532957c81104d86 Mon Sep 17 00:00:00 2001 From: Daniel Busse Date: Tue, 16 Jun 2026 22:13:09 -0600 Subject: [PATCH] Initial commit - steps 1-6 complete --- .gitignore | 21 ++++++ archidekt.py | 160 ++++++++++++++++++++++++++++++++++++++++ claude_client.py | 142 +++++++++++++++++++++++++++++++++++ deck_service.py | 130 ++++++++++++++++++++++++++++++++ manabox.py | 99 +++++++++++++++++++++++++ prompts.py | 150 +++++++++++++++++++++++++++++++++++++ scryfall.py | 187 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 889 insertions(+) create mode 100644 .gitignore create mode 100644 archidekt.py create mode 100644 claude_client.py create mode 100644 deck_service.py create mode 100644 manabox.py create mode 100644 prompts.py create mode 100644 scryfall.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a463347 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Secrets +.env + +# Python +__pycache__/ +*.pyc +*.pyo +.venv/ +venv/ + +# Frontend build +frontend/node_modules/ +frontend/dist/ + +# Logs & misc +*.log +.DS_Store + +# SSL certs (generated on server) +certbot/ +nginx/certbot/ \ No newline at end of file diff --git a/archidekt.py b/archidekt.py new file mode 100644 index 0000000..4582db7 --- /dev/null +++ b/archidekt.py @@ -0,0 +1,160 @@ +""" +Archidekt collection importer. + +Supports two export formats: + - CSV (Collection -> Export -> CSV) + - JSON (Collection -> Export -> JSON) + +CSV columns (current Archidekt format, as of 2024): + Quantity, Foil Quantity, Card Name, Set Code, Collector Number, ... + +JSON format (array of card objects): + [{"quantity": 1, "foilQuantity": 0, "card": {"name": "...", "set": {"code": "..."}, "collectorNumber": "..."}}] +""" +import csv +import io +import json +from typing import Union + +import chardet + +from app.services.imports.models import RawCardRow + +SOURCE = "archidekt" + +# CSV column name aliases — Archidekt has changed these over time +_NAME_COLS = {"card name", "name", "cardname"} +_QTY_COLS = {"quantity", "qty", "count", "amount"} +_FOIL_COLS = {"foil quantity", "foil qty", "foilqty", "foilcount", "foils"} +_SET_COLS = {"set code", "set", "setcode", "edition"} +_COLLECTOR_COLS = {"collector number", "collector #", "collectornumber", "number"} + + +def parse(raw_bytes: bytes) -> list[RawCardRow]: + """ + Auto-detect CSV vs JSON and dispatch to the appropriate parser. + Raises ValueError with a human-readable message on unrecognised format. + """ + # Detect encoding + detected = chardet.detect(raw_bytes) + encoding = detected.get("encoding") or "utf-8" + text = raw_bytes.decode(encoding, errors="replace").strip() + + if text.startswith("[") or text.startswith("{"): + return _parse_json(text) + else: + return _parse_csv(text) + + +def _parse_csv(text: str) -> list[RawCardRow]: + reader = csv.DictReader(io.StringIO(text)) + if not reader.fieldnames: + raise ValueError("Archidekt CSV appears to be empty or has no header row") + + # Normalise header names + headers = {h.strip().lower(): h for h in reader.fieldnames if h} + + name_col = _find_col(headers, _NAME_COLS, "Card Name") + qty_col = _find_col(headers, _QTY_COLS, "Quantity") + foil_col = _find_col(headers, _FOIL_COLS, None, required=False) + set_col = _find_col(headers, _SET_COLS, None, required=False) + collect_col = _find_col(headers, _COLLECTOR_COLS, None, required=False) + + rows: list[RawCardRow] = [] + for i, row in enumerate(reader, start=2): # start=2 because row 1 is header + name = row.get(name_col, "").strip() + if not name: + continue + + try: + qty = int(float(row.get(qty_col, "1") or "1")) + except (ValueError, TypeError): + qty = 1 + + try: + foil_qty = int(float(row.get(foil_col, "0") or "0")) if foil_col else 0 + except (ValueError, TypeError): + foil_qty = 0 + + rows.append(RawCardRow( + card_name=name, + set_code=(row.get(set_col, "") or "").strip().lower() if set_col else "", + collector_number=(row.get(collect_col, "") or "").strip() if collect_col else "", + quantity=qty, + foil_quantity=foil_qty, + import_source=SOURCE, + raw=dict(row), + )) + + if not rows: + raise ValueError("Archidekt CSV contained no card rows") + + return rows + + +def _parse_json(text: str) -> list[RawCardRow]: + try: + data = json.loads(text) + except json.JSONDecodeError as e: + raise ValueError(f"Archidekt JSON is malformed: {e}") + + # Handle both bare array and {"cards": [...]} wrapper + if isinstance(data, dict): + data = data.get("cards") or data.get("collection") or [] + if not isinstance(data, list): + raise ValueError("Archidekt JSON must be an array of card objects") + + rows: list[RawCardRow] = [] + for item in data: + card = item.get("card") or item # nested or flat + name = ( + card.get("name") + or card.get("cardName") + or card.get("card_name") + or "" + ).strip() + if not name: + continue + + qty = int(item.get("quantity") or item.get("qty") or 1) + foil_qty = int(item.get("foilQuantity") or item.get("foil_quantity") or 0) + + set_obj = card.get("set") or {} + set_code = ( + set_obj.get("code") if isinstance(set_obj, dict) else str(set_obj) + ).lower().strip() + + collector = str(card.get("collectorNumber") or card.get("collector_number") or "").strip() + + rows.append(RawCardRow( + card_name=name, + set_code=set_code, + collector_number=collector, + quantity=qty, + foil_quantity=foil_qty, + import_source=SOURCE, + raw=item, + )) + + if not rows: + raise ValueError("Archidekt JSON contained no card entries") + + return rows + + +def _find_col( + headers: dict[str, str], + aliases: set[str], + fallback: str | None, + required: bool = True, +) -> str | None: + for alias in aliases: + if alias in headers: + return headers[alias] + if fallback and fallback.lower() in headers: + return headers[fallback.lower()] + if required: + raise ValueError( + f"Could not find required column. Expected one of: {sorted(aliases)}" + ) + return None diff --git a/claude_client.py b/claude_client.py new file mode 100644 index 0000000..3c2d275 --- /dev/null +++ b/claude_client.py @@ -0,0 +1,142 @@ +""" +Claude API client for deck generation. + +Responsibilities: + 1. Call the Anthropic API with the appropriate prompt + 2. Parse and validate the JSON response + 3. Strip [UNOWNED] markers and track ownership + 4. Validate card names against Scryfall (batch) + 5. Return a structured DeckPayload ready to persist +""" +import json +import re +from typing import Optional + +import anthropic + +from app.core.config import settings +from app.models.deck import CardSlot +from app.services import scryfall + +_client: anthropic.AsyncAnthropic | None = None + + +def _get_client() -> anthropic.AsyncAnthropic: + global _client + if _client is None: + _client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) + return _client + + +async def call_claude( + system_prompt: str, + user_message: str, + max_tokens: int = 8000, +) -> "DeckPayload": + client = _get_client() + message = await client.messages.create( + model=settings.ANTHROPIC_MODEL, + max_tokens=max_tokens, + system=system_prompt, + messages=[{"role": "user", "content": user_message}], + ) + raw_text = _extract_text(message) + deck_json = _parse_json(raw_text) + payload = _build_payload(deck_json) + await _enrich_with_scryfall(payload) + return payload + + +UNOWNED_RE = re.compile(r"\s*\[UNOWNED\]\s*$", re.IGNORECASE) + + +def _extract_text(message) -> str: + for block in message.content: + if block.type == "text": + return block.text.strip() + raise ValueError("Claude returned no text content") + + +def _parse_json(text: str) -> dict: + text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.MULTILINE) + text = re.sub(r"\s*```$", "", text, flags=re.MULTILINE) + try: + return json.loads(text.strip()) + except json.JSONDecodeError as e: + raise ValueError(f"Claude response was not valid JSON: {e}\n\nRaw: {text[:500]}") + + +def _parse_slot(raw: str) -> CardSlot: + normalised = raw.strip().lower() + try: + return CardSlot(normalised) + except ValueError: + mapping = { + "lands": CardSlot.LAND, + "creatures": CardSlot.CREATURE, + "instants": CardSlot.INSTANT, + "sorceries": CardSlot.SORCERY, + "artifacts": CardSlot.ARTIFACT, + "enchantments": CardSlot.ENCHANTMENT, + "planeswalkers": CardSlot.PLANESWALKER, + } + return mapping.get(normalised, CardSlot.CREATURE) + + +class CardEntry: + def __init__(self, raw: dict): + raw_name: str = raw.get("name", "").strip() + self.is_owned: bool = not bool(UNOWNED_RE.search(raw_name)) + self.card_name: str = UNOWNED_RE.sub("", raw_name).strip() + self.slot: CardSlot = _parse_slot(raw.get("slot", "creature")) + self.quantity: int = max(1, int(raw.get("quantity", 1))) + self.reasoning: Optional[str] = raw.get("reasoning") + self.scryfall_id: str = "" + self.scryfall_data: dict = {} + + +class CutEntry: + def __init__(self, raw: dict): + self.card_name: str = raw.get("name", "").strip() + self.reasoning: Optional[str] = raw.get("reasoning") + + +class DeckPayload: + def __init__(self): + self.deck_name: str = "" + self.strategy_summary: str = "" + self.cards: list[CardEntry] = [] + self.cuts: list[CutEntry] = [] + self.unresolved: list[str] = [] + + +def _build_payload(data: dict) -> DeckPayload: + payload = DeckPayload() + payload.deck_name = data.get("deck_name", "Untitled Deck") + payload.strategy_summary = data.get("strategy_summary", "") + for raw_card in data.get("cards", []): + if raw_card.get("name"): + payload.cards.append(CardEntry(raw_card)) + for raw_cut in data.get("cuts", []): + if raw_cut.get("name"): + payload.cuts.append(CutEntry(raw_cut)) + return payload + + +async def _enrich_with_scryfall(payload: DeckPayload) -> None: + names = [c.card_name for c in payload.cards] + name_map = await scryfall.batch_enrich_by_name(names) + + for entry in payload.cards: + sf_data = name_map.get(entry.card_name.lower()) + if sf_data: + entry.scryfall_id = sf_data["id"] + entry.scryfall_data = sf_data + else: + sf_data = await scryfall.get_card_by_name(entry.card_name, exact=False) + if sf_data: + entry.scryfall_id = sf_data["id"] + entry.scryfall_data = sf_data + entry.card_name = sf_data["name"] + else: + payload.unresolved.append(entry.card_name) diff --git a/deck_service.py b/deck_service.py new file mode 100644 index 0000000..a7a3f39 --- /dev/null +++ b/deck_service.py @@ -0,0 +1,130 @@ +""" +Deck service: orchestrates prompt building, Claude call, and DB persistence. +""" +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select + +from app.models.collection import CollectionCard +from app.models.deck import Deck, DeckCard, DeckMode +from app.schemas.deck import DeckConstraints, GenerateRequest, CompleteRequest, CullRequest +from app.services.ai import prompts +from app.services.ai.claude_client import call_claude, DeckPayload +from app.services.ai.constraints import build_constraint_context, build_owned_card_list + + +async def generate_deck(req: GenerateRequest, user_id: int, db: AsyncSession) -> Deck: + owned_names = await _owned_names(user_id, db) if req.constraints.prefer_owned else [] + system, user_msg = prompts.generate_prompt( + commander=req.commander, + playstyle=req.playstyle, + constraint_text=build_constraint_context(req.constraints, owned_names or None), + owned_list_text=build_owned_card_list(owned_names), + ) + payload = await call_claude(system, user_msg) + return await _persist_deck( + payload=payload, user_id=user_id, mode=DeckMode.GENERATE, + commander=req.commander, name=req.name or payload.deck_name, + description=req.description or payload.strategy_summary, + playstyle=req.playstyle, constraints=req.constraints, + owned_name_set=set(n.lower() for n in owned_names), db=db, + ) + + +async def complete_deck(req: CompleteRequest, user_id: int, db: AsyncSession) -> Deck: + owned_names = await _owned_names(user_id, db) if req.constraints.prefer_owned else [] + system, user_msg = prompts.complete_prompt( + commander=req.commander, playstyle=req.playstyle, + existing_cards=req.existing_cards, + constraint_text=build_constraint_context(req.constraints, owned_names or None), + owned_list_text=build_owned_card_list(owned_names), + ) + payload = await call_claude(system, user_msg) + return await _persist_deck( + payload=payload, user_id=user_id, mode=DeckMode.COMPLETE, + commander=req.commander, name=req.name or payload.deck_name, + description=payload.strategy_summary, playstyle=req.playstyle, + constraints=req.constraints, + owned_name_set=set(n.lower() for n in owned_names), db=db, + ) + + +async def cull_deck(req: CullRequest, user_id: int, db: AsyncSession) -> Deck: + owned_names = await _owned_names(user_id, db) if req.constraints.prefer_owned else [] + owned_set = set(n.lower() for n in owned_names) + for card in req.existing_cards: + card["is_owned"] = card.get("card_name", "").lower() in owned_set + + system, user_msg = prompts.cull_prompt( + commander=req.commander, existing_cards=req.existing_cards, + target_count=req.target_count, + constraint_text=build_constraint_context(req.constraints, owned_names or None), + owned_list_text=build_owned_card_list(owned_names), + prefer_owned=req.constraints.prefer_owned, + ) + payload = await call_claude(system, user_msg, max_tokens=10000) + return await _persist_deck( + payload=payload, user_id=user_id, mode=DeckMode.CULL, + commander=req.commander, name=req.name or payload.deck_name, + description=payload.strategy_summary, playstyle=None, + constraints=req.constraints, owned_name_set=owned_set, db=db, + ) + + +async def _persist_deck( + payload: DeckPayload, user_id: int, mode: DeckMode, + commander: str, name: str, description: str | None, + playstyle: str | None, constraints: DeckConstraints, + owned_name_set: set[str], db: AsyncSession, +) -> Deck: + deck = Deck( + owner_id=user_id, name=name, commander=commander, + description=description, mode=mode, playstyle=playstyle, + prefer_owned=constraints.prefer_owned, + budget_enabled=constraints.budget_enabled, + budget_amount=constraints.budget_amount, + budget_scope=constraints.budget_scope, + ai_reasoning={ + "strategy_summary": payload.strategy_summary, + "unresolved_cards": payload.unresolved, + "cuts": [{"name": c.card_name, "reasoning": c.reasoning} for c in payload.cuts], + }, + ) + db.add(deck) + await db.flush() + + deck_cards = [ + DeckCard( + deck_id=deck.id, scryfall_id="", card_name=commander, + slot="creature", quantity=1, + is_owned=commander.lower() in owned_name_set, + is_commander=True, + ) + ] + + for entry in payload.cards: + if not entry.scryfall_id: + continue + is_owned = ( + entry.card_name.lower() in owned_name_set + if constraints.prefer_owned + else entry.is_owned + ) + deck_cards.append(DeckCard( + deck_id=deck.id, scryfall_id=entry.scryfall_id, + card_name=entry.card_name, slot=entry.slot, + quantity=entry.quantity, is_owned=is_owned, + is_commander=False, ai_reasoning=entry.reasoning, + scryfall_data=entry.scryfall_data, + )) + + db.add_all(deck_cards) + await db.commit() + await db.refresh(deck) + return deck + + +async def _owned_names(user_id: int, db: AsyncSession) -> list[str]: + result = await db.execute( + select(CollectionCard.card_name).where(CollectionCard.owner_id == user_id) + ) + return [row[0] for row in result.all()] diff --git a/manabox.py b/manabox.py new file mode 100644 index 0000000..48698c0 --- /dev/null +++ b/manabox.py @@ -0,0 +1,99 @@ +""" +Manabox collection importer. + +Manabox exports a single CSV format: + Name, Set code, Collector number, Foil, Quantity, ... + +The "Foil" column is a boolean ("Yes"/"No" or "1"/"0") applied per-row. +Manabox creates separate rows for foil and non-foil copies of the same card, +so we merge them into a single RawCardRow (quantity + foil_quantity). +""" +import csv +import io +from collections import defaultdict + +import chardet + +from app.services.imports.models import RawCardRow + +SOURCE = "manabox" + +_NAME_COLS = {"name", "card name", "cardname", "card_name"} +_SET_COLS = {"set code", "set", "setcode", "set_code", "edition code"} +_COLLECTOR_COLS = {"collector number", "collector #", "collectornumber", "number"} +_FOIL_COLS = {"foil", "is foil", "isfoil"} +_QTY_COLS = {"quantity", "qty", "count", "amount"} + + +def parse(raw_bytes: bytes) -> list[RawCardRow]: + detected = chardet.detect(raw_bytes) + encoding = detected.get("encoding") or "utf-8" + text = raw_bytes.decode(encoding, errors="replace").strip() + + reader = csv.DictReader(io.StringIO(text)) + if not reader.fieldnames: + raise ValueError("Manabox CSV appears to be empty or has no header row") + + headers = {h.strip().lower(): h for h in reader.fieldnames if h} + + name_col = _find_col(headers, _NAME_COLS, "Name") + set_col = _find_col(headers, _SET_COLS, None, required=False) + collect_col = _find_col(headers, _COLLECTOR_COLS, None, required=False) + foil_col = _find_col(headers, _FOIL_COLS, None, required=False) + qty_col = _find_col(headers, _QTY_COLS, "Quantity") + + # Key: (card_name, set_code, collector_number) → merged row + merged: dict[tuple, RawCardRow] = defaultdict(lambda: RawCardRow(import_source=SOURCE)) + + for row in reader: + name = row.get(name_col, "").strip() + if not name: + continue + + set_code = (row.get(set_col, "") or "").strip().lower() if set_col else "" + collector = (row.get(collect_col, "") or "").strip() if collect_col else "" + key = (name.lower(), set_code, collector) + + try: + qty = int(float(row.get(qty_col, "1") or "1")) + except (ValueError, TypeError): + qty = 1 + + is_foil = _parse_bool(row.get(foil_col, "")) if foil_col else False + + entry = merged[key] + entry.card_name = name + entry.set_code = set_code + entry.collector_number = collector + + if is_foil: + entry.foil_quantity += qty + else: + entry.quantity += qty + + if not merged: + raise ValueError("Manabox CSV contained no card rows") + + return list(merged.values()) + + +def _parse_bool(value: str) -> bool: + return str(value).strip().lower() in {"yes", "true", "1", "y"} + + +def _find_col( + headers: dict[str, str], + aliases: set[str], + fallback: str | None, + required: bool = True, +) -> str | None: + for alias in aliases: + if alias in headers: + return headers[alias] + if fallback and fallback.lower() in headers: + return headers[fallback.lower()] + if required: + raise ValueError( + f"Could not find required column. Expected one of: {sorted(aliases)}" + ) + return None diff --git a/prompts.py b/prompts.py new file mode 100644 index 0000000..d35b7af --- /dev/null +++ b/prompts.py @@ -0,0 +1,150 @@ +""" +Prompt templates for the three deck modes. + +Output contract — Claude returns a single JSON object: +{ + "deck_name": "string", + "strategy_summary": "string", + "cards": [ + { + "name": "Card Name", // append [UNOWNED] when prefer_owned=True + "slot": "creature|instant|sorcery|enchantment|artifact|planeswalker|land|battle", + "quantity": 1, + "reasoning": "1-2 sentences" + } + ], + "cuts": [ // CULL mode only + { "name": "Card Name", "reasoning": "..." } + ] +} +""" + +SYSTEM_PROMPT = ( + "You are an expert Magic: The Gathering deck builder specialising in the Commander " + "(EDH) format. You have deep knowledge of card synergies, mana curves, colour " + "identity rules, staples, budget alternatives, and current metagame trends.\n\n" + "You always respond with a single valid JSON object — no markdown fences, no " + "preamble, no commentary outside the JSON. Your card names must exactly match " + "official Magic card names (English). Every card must be legal in Commander " + "and within the commander's colour identity." +) + + +def generate_prompt( + commander: str, + playstyle: str | None, + constraint_text: str, + owned_list_text: str, +) -> tuple[str, str]: + playstyle_line = ( + f"Playstyle preference: {playstyle}" + if playstyle + else "Playstyle preference: not specified — choose the strongest strategy for this commander." + ) + + user_message = ( + f"Build a complete Commander deck for the following commander.\n\n" + f"COMMANDER: {commander}\n" + f"{playstyle_line}\n\n" + f"CONSTRAINTS:\n{constraint_text}\n" + f"{owned_list_text}\n" + f"DECK REQUIREMENTS:\n" + f"- Exactly 99 cards (not counting the commander)\n" + f"- All cards must be legal in Commander and within {commander}'s colour identity\n" + f"- Include a balanced mana base (35-40 lands for most strategies)\n" + f"- Include ramp (8-12 pieces), card draw (8-10 pieces), removal (8-10 pieces), " + f"and win conditions appropriate to the playstyle\n" + f"- Slot values: creature, instant, sorcery, enchantment, artifact, planeswalker, land, battle\n" + f"- Quantity for basic lands may be >1; all other cards quantity = 1\n" + f"- The 'reasoning' field must explain why the card fits THIS specific deck\n\n" + f"Respond with the JSON object only." + ) + return SYSTEM_PROMPT, user_message + + +def complete_prompt( + commander: str, + playstyle: str | None, + existing_cards: list[dict], + constraint_text: str, + owned_list_text: str, +) -> tuple[str, str]: + existing_count = sum(c.get("quantity", 1) for c in existing_cards) + slots_needed = 99 - existing_count + + playstyle_line = ( + f"Playstyle preference: {playstyle}" + if playstyle + else "Playstyle preference: infer from existing cards." + ) + + existing_formatted = "\n".join( + f"- {c['card_name']} ({c.get('slot', 'unknown')})" + + (f" x{c['quantity']}" if c.get('quantity', 1) > 1 else "") + for c in existing_cards + ) + + user_message = ( + f"The user has a partial Commander deck and needs suggestions to complete it.\n\n" + f"COMMANDER: {commander}\n" + f"{playstyle_line}\n" + f"SLOTS NEEDED: {slots_needed} more cards to reach 99\n\n" + f"EXISTING CARDS ({existing_count} cards):\n{existing_formatted}\n\n" + f"CONSTRAINTS:\n{constraint_text}\n" + f"{owned_list_text}\n" + f"INSTRUCTIONS:\n" + f"- Suggest exactly {slots_needed} new cards to fill remaining slots\n" + f"- Do not repeat any card already in the existing list\n" + f"- All cards must be legal in Commander and within {commander}'s colour identity\n" + f"- Analyse existing cards to infer strategy and fill gaps (ramp, draw, removal, win-cons)\n" + f"- The 'cards' array contains ONLY the new cards you are recommending\n" + f"- strategy_summary should describe how the completed deck plays\n\n" + f"Respond with the JSON object only." + ) + return SYSTEM_PROMPT, user_message + + +def cull_prompt( + commander: str, + existing_cards: list[dict], + target_count: int, + constraint_text: str, + owned_list_text: str, + prefer_owned: bool, +) -> tuple[str, str]: + current_count = sum(c.get("quantity", 1) for c in existing_cards) + cuts_needed = current_count - target_count + + existing_formatted = "\n".join( + f"- {c['card_name']} ({c.get('slot', 'unknown')})" + + (" [OWNED]" if c.get("is_owned") else "") + for c in existing_cards + ) + + ownership_note = ( + "\n- IMPORTANT: Prioritise cutting cards NOT marked [OWNED] first — " + "this saves the user money on cards they would have to buy." + if prefer_owned + else "" + ) + + user_message = ( + f"The user's Commander deck is oversized and needs to be culled.\n\n" + f"COMMANDER: {commander}\n" + f"CURRENT SIZE: {current_count} cards\n" + f"TARGET SIZE: {target_count} cards\n" + f"CUTS NEEDED: {cuts_needed} cards\n\n" + f"CURRENT DECKLIST:\n{existing_formatted}\n\n" + f"CONSTRAINTS:\n{constraint_text}\n" + f"{owned_list_text}\n" + f"INSTRUCTIONS:\n" + f"- Recommend exactly {cuts_needed} cards to cut\n" + f"- Identify redundancy, weak synergy, overcosted cards, and cards that " + f"don't advance the primary strategy\n" + f"- Order the 'cuts' array from most-recommended to least-recommended cut{ownership_note}\n" + f"- The 'cards' array contains all {target_count} REMAINING cards after cuts\n" + f"- Provide specific reasoning for each cut explaining why it's weaker than what stays\n" + f"- strategy_summary describes the refined deck after cuts\n\n" + f"Respond with the JSON object only." + ) + return SYSTEM_PROMPT, user_message diff --git a/scryfall.py b/scryfall.py new file mode 100644 index 0000000..ecc1942 --- /dev/null +++ b/scryfall.py @@ -0,0 +1,187 @@ +""" +Scryfall API client with Redis caching. + +Respects Scryfall's rate limit (10 req/s with a small safety margin). +Cards are cached for 24 hours by default (SCRYFALL_CACHE_TTL_SECONDS). +""" +import asyncio +import json +from typing import Optional + +import httpx +from tenacity import retry, stop_after_attempt, wait_exponential + +from app.core.config import settings +from app.core.redis import get_redis + +SCRYFALL_BASE = "https://api.scryfall.com" +_semaphore: asyncio.Semaphore | None = None + + +def _get_semaphore() -> asyncio.Semaphore: + global _semaphore + if _semaphore is None: + _semaphore = asyncio.Semaphore(settings.SCRYFALL_RATE_LIMIT_RPS) + return _semaphore + + +def _cache_key(kind: str, value: str) -> str: + return f"scryfall:{kind}:{value.lower()}" + + +async def _cache_get(key: str) -> Optional[dict]: + redis = await get_redis() + raw = await redis.get(key) + return json.loads(raw) if raw else None + + +async def _cache_set(key: str, data: dict) -> None: + redis = await get_redis() + await redis.set(key, json.dumps(data), ex=settings.SCRYFALL_CACHE_TTL_SECONDS) + + +@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=4)) +async def _get(path: str, params: dict | None = None) -> dict: + async with _get_semaphore(): + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{SCRYFALL_BASE}{path}", + params=params, + headers={"User-Agent": "MTGDeckBuilder/1.0"}, + ) + resp.raise_for_status() + return resp.json() + + +async def get_card_by_name(name: str, exact: bool = True) -> Optional[dict]: + """Fetch a card by name. Uses fuzzy search if exact=False.""" + key = _cache_key("name", name) + cached = await _cache_get(key) + if cached: + return cached + + try: + if exact: + data = await _get("/cards/named", {"exact": name}) + else: + data = await _get("/cards/named", {"fuzzy": name}) + await _cache_set(key, data) + # Also cache by scryfall id + await _cache_set(_cache_key("id", data["id"]), data) + return data + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + raise + + +async def get_card_by_id(scryfall_id: str) -> Optional[dict]: + """Fetch a card by its Scryfall UUID.""" + key = _cache_key("id", scryfall_id) + cached = await _cache_get(key) + if cached: + return cached + + try: + data = await _get(f"/cards/{scryfall_id}") + await _cache_set(key, data) + return data + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + raise + + +async def get_card_by_set_and_collector(set_code: str, collector_number: str) -> Optional[dict]: + """Fetch a card by set + collector number (used during import enrichment).""" + key = _cache_key("setcol", f"{set_code}:{collector_number}") + cached = await _cache_get(key) + if cached: + return cached + + try: + data = await _get(f"/cards/{set_code.lower()}/{collector_number}") + await _cache_set(key, data) + await _cache_set(_cache_key("id", data["id"]), data) + return data + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + raise + + +async def batch_enrich_by_name(names: list[str]) -> dict[str, Optional[dict]]: + """ + Resolve up to 75 card names at once via Scryfall's /cards/collection endpoint. + Returns a dict of {name_lower: scryfall_data}. + Falls back to None for cards not found. + """ + results: dict[str, Optional[dict]] = {} + uncached: list[str] = [] + + # Check cache first + for name in names: + key = _cache_key("name", name) + cached = await _cache_get(key) + if cached: + results[name.lower()] = cached + else: + uncached.append(name) + + # Batch fetch uncached in chunks of 75 (Scryfall limit) + for chunk_start in range(0, len(uncached), 75): + chunk = uncached[chunk_start : chunk_start + 75] + identifiers = [{"name": n} for n in chunk] + + async with _get_semaphore(): + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.post( + f"{SCRYFALL_BASE}/cards/collection", + json={"identifiers": identifiers}, + headers={"User-Agent": "MTGDeckBuilder/1.0"}, + ) + resp.raise_for_status() + data = resp.json() + + for card in data.get("data", []): + name_lower = card["name"].lower() + results[name_lower] = card + await _cache_set(_cache_key("name", card["name"]), card) + await _cache_set(_cache_key("id", card["id"]), card) + + # Mark not-found as None + found_lower = {c["name"].lower() for c in data.get("data", [])} + for name in chunk: + if name.lower() not in found_lower: + results[name.lower()] = None + + # Be polite between chunks + if chunk_start + 75 < len(uncached): + await asyncio.sleep(0.15) + + return results + + +def extract_price_usd(scryfall_data: dict) -> Optional[float]: + """Pull the cheapest non-foil USD price from a Scryfall card object.""" + try: + price = scryfall_data.get("prices", {}).get("usd") + return float(price) if price else None + except (TypeError, ValueError): + return None + + +def card_image_url(scryfall_data: dict, size: str = "normal") -> Optional[str]: + """Return an image URL. Handles split/transform/MDFCs gracefully.""" + images = scryfall_data.get("image_uris") + if images: + return images.get(size) + # Double-faced cards store images per face + faces = scryfall_data.get("card_faces", []) + if faces: + return faces[0].get("image_uris", {}).get(size) + return None + + +def is_commander_legal(scryfall_data: dict) -> bool: + return scryfall_data.get("legalities", {}).get("commander") == "legal"