Restructure into full project layout

This commit is contained in:
2026-06-16 23:06:16 -06:00
parent de4862b2d1
commit 57765496a6
74 changed files with 4441 additions and 3 deletions
View File
View File
+142
View File
@@ -0,0 +1,142 @@
"""
Claude API client for deck generation.
Responsibilities:
1. Call the Anthropic API with the appropriate prompt
2. Parse and validate the JSON response
3. Strip [UNOWNED] markers and track ownership
4. Validate card names against Scryfall (batch)
5. Return a structured DeckPayload ready to persist
"""
import json
import re
from typing import Optional
import anthropic
from app.core.config import settings
from app.models.deck import CardSlot
from app.services import scryfall
_client: anthropic.AsyncAnthropic | None = None
def _get_client() -> anthropic.AsyncAnthropic:
global _client
if _client is None:
_client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
return _client
async def call_claude(
system_prompt: str,
user_message: str,
max_tokens: int = 8000,
) -> "DeckPayload":
client = _get_client()
message = await client.messages.create(
model=settings.ANTHROPIC_MODEL,
max_tokens=max_tokens,
system=system_prompt,
messages=[{"role": "user", "content": user_message}],
)
raw_text = _extract_text(message)
deck_json = _parse_json(raw_text)
payload = _build_payload(deck_json)
await _enrich_with_scryfall(payload)
return payload
UNOWNED_RE = re.compile(r"\s*\[UNOWNED\]\s*$", re.IGNORECASE)
def _extract_text(message) -> str:
for block in message.content:
if block.type == "text":
return block.text.strip()
raise ValueError("Claude returned no text content")
def _parse_json(text: str) -> dict:
text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.MULTILINE)
text = re.sub(r"\s*```$", "", text, flags=re.MULTILINE)
try:
return json.loads(text.strip())
except json.JSONDecodeError as e:
raise ValueError(f"Claude response was not valid JSON: {e}\n\nRaw: {text[:500]}")
def _parse_slot(raw: str) -> CardSlot:
normalised = raw.strip().lower()
try:
return CardSlot(normalised)
except ValueError:
mapping = {
"lands": CardSlot.LAND,
"creatures": CardSlot.CREATURE,
"instants": CardSlot.INSTANT,
"sorceries": CardSlot.SORCERY,
"artifacts": CardSlot.ARTIFACT,
"enchantments": CardSlot.ENCHANTMENT,
"planeswalkers": CardSlot.PLANESWALKER,
}
return mapping.get(normalised, CardSlot.CREATURE)
class CardEntry:
def __init__(self, raw: dict):
raw_name: str = raw.get("name", "").strip()
self.is_owned: bool = not bool(UNOWNED_RE.search(raw_name))
self.card_name: str = UNOWNED_RE.sub("", raw_name).strip()
self.slot: CardSlot = _parse_slot(raw.get("slot", "creature"))
self.quantity: int = max(1, int(raw.get("quantity", 1)))
self.reasoning: Optional[str] = raw.get("reasoning")
self.scryfall_id: str = ""
self.scryfall_data: dict = {}
class CutEntry:
def __init__(self, raw: dict):
self.card_name: str = raw.get("name", "").strip()
self.reasoning: Optional[str] = raw.get("reasoning")
class DeckPayload:
def __init__(self):
self.deck_name: str = ""
self.strategy_summary: str = ""
self.cards: list[CardEntry] = []
self.cuts: list[CutEntry] = []
self.unresolved: list[str] = []
def _build_payload(data: dict) -> DeckPayload:
payload = DeckPayload()
payload.deck_name = data.get("deck_name", "Untitled Deck")
payload.strategy_summary = data.get("strategy_summary", "")
for raw_card in data.get("cards", []):
if raw_card.get("name"):
payload.cards.append(CardEntry(raw_card))
for raw_cut in data.get("cuts", []):
if raw_cut.get("name"):
payload.cuts.append(CutEntry(raw_cut))
return payload
async def _enrich_with_scryfall(payload: DeckPayload) -> None:
names = [c.card_name for c in payload.cards]
name_map = await scryfall.batch_enrich_by_name(names)
for entry in payload.cards:
sf_data = name_map.get(entry.card_name.lower())
if sf_data:
entry.scryfall_id = sf_data["id"]
entry.scryfall_data = sf_data
else:
sf_data = await scryfall.get_card_by_name(entry.card_name, exact=False)
if sf_data:
entry.scryfall_id = sf_data["id"]
entry.scryfall_data = sf_data
entry.card_name = sf_data["name"]
else:
payload.unresolved.append(entry.card_name)
+25
View File
@@ -0,0 +1,25 @@
from app.schemas.deck import DeckConstraints
def build_constraint_context(constraints: DeckConstraints, owned_names: list[str] | None) -> str:
lines = []
if constraints.prefer_owned and owned_names:
lines.append("- Prefer cards the user already owns (marked [OWNED] in the list below)")
lines.append("- Mark any recommended card the user does NOT own with [UNOWNED] suffix")
if constraints.budget_enabled and constraints.budget_amount:
scope = "total deck" if constraints.budget_scope == "total" else "cards to purchase"
lines.append(f"- Budget limit: ${constraints.budget_amount:.2f} for {scope}")
if not lines:
lines.append("- No special constraints — recommend the strongest cards available")
return "\n".join(lines)
def build_owned_card_list(owned_names: list[str]) -> str:
if not owned_names:
return ""
card_list = "\n".join(f"- {name}" for name in sorted(owned_names))
return f"\nOWNED CARDS:\n{card_list}\n"
+130
View File
@@ -0,0 +1,130 @@
"""
Deck service: orchestrates prompt building, Claude call, and DB persistence.
"""
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.collection import CollectionCard
from app.models.deck import Deck, DeckCard, DeckMode
from app.schemas.deck import DeckConstraints, GenerateRequest, CompleteRequest, CullRequest
from app.services.ai import prompts
from app.services.ai.claude_client import call_claude, DeckPayload
from app.services.ai.constraints import build_constraint_context, build_owned_card_list
async def generate_deck(req: GenerateRequest, user_id: int, db: AsyncSession) -> Deck:
owned_names = await _owned_names(user_id, db) if req.constraints.prefer_owned else []
system, user_msg = prompts.generate_prompt(
commander=req.commander,
playstyle=req.playstyle,
constraint_text=build_constraint_context(req.constraints, owned_names or None),
owned_list_text=build_owned_card_list(owned_names),
)
payload = await call_claude(system, user_msg)
return await _persist_deck(
payload=payload, user_id=user_id, mode=DeckMode.GENERATE,
commander=req.commander, name=req.name or payload.deck_name,
description=req.description or payload.strategy_summary,
playstyle=req.playstyle, constraints=req.constraints,
owned_name_set=set(n.lower() for n in owned_names), db=db,
)
async def complete_deck(req: CompleteRequest, user_id: int, db: AsyncSession) -> Deck:
owned_names = await _owned_names(user_id, db) if req.constraints.prefer_owned else []
system, user_msg = prompts.complete_prompt(
commander=req.commander, playstyle=req.playstyle,
existing_cards=req.existing_cards,
constraint_text=build_constraint_context(req.constraints, owned_names or None),
owned_list_text=build_owned_card_list(owned_names),
)
payload = await call_claude(system, user_msg)
return await _persist_deck(
payload=payload, user_id=user_id, mode=DeckMode.COMPLETE,
commander=req.commander, name=req.name or payload.deck_name,
description=payload.strategy_summary, playstyle=req.playstyle,
constraints=req.constraints,
owned_name_set=set(n.lower() for n in owned_names), db=db,
)
async def cull_deck(req: CullRequest, user_id: int, db: AsyncSession) -> Deck:
owned_names = await _owned_names(user_id, db) if req.constraints.prefer_owned else []
owned_set = set(n.lower() for n in owned_names)
for card in req.existing_cards:
card["is_owned"] = card.get("card_name", "").lower() in owned_set
system, user_msg = prompts.cull_prompt(
commander=req.commander, existing_cards=req.existing_cards,
target_count=req.target_count,
constraint_text=build_constraint_context(req.constraints, owned_names or None),
owned_list_text=build_owned_card_list(owned_names),
prefer_owned=req.constraints.prefer_owned,
)
payload = await call_claude(system, user_msg, max_tokens=10000)
return await _persist_deck(
payload=payload, user_id=user_id, mode=DeckMode.CULL,
commander=req.commander, name=req.name or payload.deck_name,
description=payload.strategy_summary, playstyle=None,
constraints=req.constraints, owned_name_set=owned_set, db=db,
)
async def _persist_deck(
payload: DeckPayload, user_id: int, mode: DeckMode,
commander: str, name: str, description: str | None,
playstyle: str | None, constraints: DeckConstraints,
owned_name_set: set[str], db: AsyncSession,
) -> Deck:
deck = Deck(
owner_id=user_id, name=name, commander=commander,
description=description, mode=mode, playstyle=playstyle,
prefer_owned=constraints.prefer_owned,
budget_enabled=constraints.budget_enabled,
budget_amount=constraints.budget_amount,
budget_scope=constraints.budget_scope,
ai_reasoning={
"strategy_summary": payload.strategy_summary,
"unresolved_cards": payload.unresolved,
"cuts": [{"name": c.card_name, "reasoning": c.reasoning} for c in payload.cuts],
},
)
db.add(deck)
await db.flush()
deck_cards = [
DeckCard(
deck_id=deck.id, scryfall_id="", card_name=commander,
slot="creature", quantity=1,
is_owned=commander.lower() in owned_name_set,
is_commander=True,
)
]
for entry in payload.cards:
if not entry.scryfall_id:
continue
is_owned = (
entry.card_name.lower() in owned_name_set
if constraints.prefer_owned
else entry.is_owned
)
deck_cards.append(DeckCard(
deck_id=deck.id, scryfall_id=entry.scryfall_id,
card_name=entry.card_name, slot=entry.slot,
quantity=entry.quantity, is_owned=is_owned,
is_commander=False, ai_reasoning=entry.reasoning,
scryfall_data=entry.scryfall_data,
))
db.add_all(deck_cards)
await db.commit()
await db.refresh(deck)
return deck
async def _owned_names(user_id: int, db: AsyncSession) -> list[str]:
result = await db.execute(
select(CollectionCard.card_name).where(CollectionCard.owner_id == user_id)
)
return [row[0] for row in result.all()]
+150
View File
@@ -0,0 +1,150 @@
"""
Prompt templates for the three deck modes.
Output contract — Claude returns a single JSON object:
{
"deck_name": "string",
"strategy_summary": "string",
"cards": [
{
"name": "Card Name", // append [UNOWNED] when prefer_owned=True
"slot": "creature|instant|sorcery|enchantment|artifact|planeswalker|land|battle",
"quantity": 1,
"reasoning": "1-2 sentences"
}
],
"cuts": [ // CULL mode only
{ "name": "Card Name", "reasoning": "..." }
]
}
"""
SYSTEM_PROMPT = (
"You are an expert Magic: The Gathering deck builder specialising in the Commander "
"(EDH) format. You have deep knowledge of card synergies, mana curves, colour "
"identity rules, staples, budget alternatives, and current metagame trends.\n\n"
"You always respond with a single valid JSON object — no markdown fences, no "
"preamble, no commentary outside the JSON. Your card names must exactly match "
"official Magic card names (English). Every card must be legal in Commander "
"and within the commander's colour identity."
)
def generate_prompt(
commander: str,
playstyle: str | None,
constraint_text: str,
owned_list_text: str,
) -> tuple[str, str]:
playstyle_line = (
f"Playstyle preference: {playstyle}"
if playstyle
else "Playstyle preference: not specified — choose the strongest strategy for this commander."
)
user_message = (
f"Build a complete Commander deck for the following commander.\n\n"
f"COMMANDER: {commander}\n"
f"{playstyle_line}\n\n"
f"CONSTRAINTS:\n{constraint_text}\n"
f"{owned_list_text}\n"
f"DECK REQUIREMENTS:\n"
f"- Exactly 99 cards (not counting the commander)\n"
f"- All cards must be legal in Commander and within {commander}'s colour identity\n"
f"- Include a balanced mana base (35-40 lands for most strategies)\n"
f"- Include ramp (8-12 pieces), card draw (8-10 pieces), removal (8-10 pieces), "
f"and win conditions appropriate to the playstyle\n"
f"- Slot values: creature, instant, sorcery, enchantment, artifact, planeswalker, land, battle\n"
f"- Quantity for basic lands may be >1; all other cards quantity = 1\n"
f"- The 'reasoning' field must explain why the card fits THIS specific deck\n\n"
f"Respond with the JSON object only."
)
return SYSTEM_PROMPT, user_message
def complete_prompt(
commander: str,
playstyle: str | None,
existing_cards: list[dict],
constraint_text: str,
owned_list_text: str,
) -> tuple[str, str]:
existing_count = sum(c.get("quantity", 1) for c in existing_cards)
slots_needed = 99 - existing_count
playstyle_line = (
f"Playstyle preference: {playstyle}"
if playstyle
else "Playstyle preference: infer from existing cards."
)
existing_formatted = "\n".join(
f"- {c['card_name']} ({c.get('slot', 'unknown')})"
+ (f" x{c['quantity']}" if c.get('quantity', 1) > 1 else "")
for c in existing_cards
)
user_message = (
f"The user has a partial Commander deck and needs suggestions to complete it.\n\n"
f"COMMANDER: {commander}\n"
f"{playstyle_line}\n"
f"SLOTS NEEDED: {slots_needed} more cards to reach 99\n\n"
f"EXISTING CARDS ({existing_count} cards):\n{existing_formatted}\n\n"
f"CONSTRAINTS:\n{constraint_text}\n"
f"{owned_list_text}\n"
f"INSTRUCTIONS:\n"
f"- Suggest exactly {slots_needed} new cards to fill remaining slots\n"
f"- Do not repeat any card already in the existing list\n"
f"- All cards must be legal in Commander and within {commander}'s colour identity\n"
f"- Analyse existing cards to infer strategy and fill gaps (ramp, draw, removal, win-cons)\n"
f"- The 'cards' array contains ONLY the new cards you are recommending\n"
f"- strategy_summary should describe how the completed deck plays\n\n"
f"Respond with the JSON object only."
)
return SYSTEM_PROMPT, user_message
def cull_prompt(
commander: str,
existing_cards: list[dict],
target_count: int,
constraint_text: str,
owned_list_text: str,
prefer_owned: bool,
) -> tuple[str, str]:
current_count = sum(c.get("quantity", 1) for c in existing_cards)
cuts_needed = current_count - target_count
existing_formatted = "\n".join(
f"- {c['card_name']} ({c.get('slot', 'unknown')})"
+ (" [OWNED]" if c.get("is_owned") else "")
for c in existing_cards
)
ownership_note = (
"\n- IMPORTANT: Prioritise cutting cards NOT marked [OWNED] first — "
"this saves the user money on cards they would have to buy."
if prefer_owned
else ""
)
user_message = (
f"The user's Commander deck is oversized and needs to be culled.\n\n"
f"COMMANDER: {commander}\n"
f"CURRENT SIZE: {current_count} cards\n"
f"TARGET SIZE: {target_count} cards\n"
f"CUTS NEEDED: {cuts_needed} cards\n\n"
f"CURRENT DECKLIST:\n{existing_formatted}\n\n"
f"CONSTRAINTS:\n{constraint_text}\n"
f"{owned_list_text}\n"
f"INSTRUCTIONS:\n"
f"- Recommend exactly {cuts_needed} cards to cut\n"
f"- Identify redundancy, weak synergy, overcosted cards, and cards that "
f"don't advance the primary strategy\n"
f"- Order the 'cuts' array from most-recommended to least-recommended cut{ownership_note}\n"
f"- The 'cards' array contains all {target_count} REMAINING cards after cuts\n"
f"- Provide specific reasoning for each cut explaining why it's weaker than what stays\n"
f"- strategy_summary describes the refined deck after cuts\n\n"
f"Respond with the JSON object only."
)
return SYSTEM_PROMPT, user_message
+160
View File
@@ -0,0 +1,160 @@
"""
Archidekt collection importer.
Supports two export formats:
- CSV (Collection -> Export -> CSV)
- JSON (Collection -> Export -> JSON)
CSV columns (current Archidekt format, as of 2024):
Quantity, Foil Quantity, Card Name, Set Code, Collector Number, ...
JSON format (array of card objects):
[{"quantity": 1, "foilQuantity": 0, "card": {"name": "...", "set": {"code": "..."}, "collectorNumber": "..."}}]
"""
import csv
import io
import json
from typing import Union
import chardet
from app.services.imports.models import RawCardRow
SOURCE = "archidekt"
# CSV column name aliases — Archidekt has changed these over time
_NAME_COLS = {"card name", "name", "cardname"}
_QTY_COLS = {"quantity", "qty", "count", "amount"}
_FOIL_COLS = {"foil quantity", "foil qty", "foilqty", "foilcount", "foils"}
_SET_COLS = {"set code", "set", "setcode", "edition"}
_COLLECTOR_COLS = {"collector number", "collector #", "collectornumber", "number"}
def parse(raw_bytes: bytes) -> list[RawCardRow]:
"""
Auto-detect CSV vs JSON and dispatch to the appropriate parser.
Raises ValueError with a human-readable message on unrecognised format.
"""
# Detect encoding
detected = chardet.detect(raw_bytes)
encoding = detected.get("encoding") or "utf-8"
text = raw_bytes.decode(encoding, errors="replace").strip()
if text.startswith("[") or text.startswith("{"):
return _parse_json(text)
else:
return _parse_csv(text)
def _parse_csv(text: str) -> list[RawCardRow]:
reader = csv.DictReader(io.StringIO(text))
if not reader.fieldnames:
raise ValueError("Archidekt CSV appears to be empty or has no header row")
# Normalise header names
headers = {h.strip().lower(): h for h in reader.fieldnames if h}
name_col = _find_col(headers, _NAME_COLS, "Card Name")
qty_col = _find_col(headers, _QTY_COLS, "Quantity")
foil_col = _find_col(headers, _FOIL_COLS, None, required=False)
set_col = _find_col(headers, _SET_COLS, None, required=False)
collect_col = _find_col(headers, _COLLECTOR_COLS, None, required=False)
rows: list[RawCardRow] = []
for i, row in enumerate(reader, start=2): # start=2 because row 1 is header
name = row.get(name_col, "").strip()
if not name:
continue
try:
qty = int(float(row.get(qty_col, "1") or "1"))
except (ValueError, TypeError):
qty = 1
try:
foil_qty = int(float(row.get(foil_col, "0") or "0")) if foil_col else 0
except (ValueError, TypeError):
foil_qty = 0
rows.append(RawCardRow(
card_name=name,
set_code=(row.get(set_col, "") or "").strip().lower() if set_col else "",
collector_number=(row.get(collect_col, "") or "").strip() if collect_col else "",
quantity=qty,
foil_quantity=foil_qty,
import_source=SOURCE,
raw=dict(row),
))
if not rows:
raise ValueError("Archidekt CSV contained no card rows")
return rows
def _parse_json(text: str) -> list[RawCardRow]:
try:
data = json.loads(text)
except json.JSONDecodeError as e:
raise ValueError(f"Archidekt JSON is malformed: {e}")
# Handle both bare array and {"cards": [...]} wrapper
if isinstance(data, dict):
data = data.get("cards") or data.get("collection") or []
if not isinstance(data, list):
raise ValueError("Archidekt JSON must be an array of card objects")
rows: list[RawCardRow] = []
for item in data:
card = item.get("card") or item # nested or flat
name = (
card.get("name")
or card.get("cardName")
or card.get("card_name")
or ""
).strip()
if not name:
continue
qty = int(item.get("quantity") or item.get("qty") or 1)
foil_qty = int(item.get("foilQuantity") or item.get("foil_quantity") or 0)
set_obj = card.get("set") or {}
set_code = (
set_obj.get("code") if isinstance(set_obj, dict) else str(set_obj)
).lower().strip()
collector = str(card.get("collectorNumber") or card.get("collector_number") or "").strip()
rows.append(RawCardRow(
card_name=name,
set_code=set_code,
collector_number=collector,
quantity=qty,
foil_quantity=foil_qty,
import_source=SOURCE,
raw=item,
))
if not rows:
raise ValueError("Archidekt JSON contained no card entries")
return rows
def _find_col(
headers: dict[str, str],
aliases: set[str],
fallback: str | None,
required: bool = True,
) -> str | None:
for alias in aliases:
if alias in headers:
return headers[alias]
if fallback and fallback.lower() in headers:
return headers[fallback.lower()]
if required:
raise ValueError(
f"Could not find required column. Expected one of: {sorted(aliases)}"
)
return None
@@ -0,0 +1,63 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from app.models.collection import CollectionCard
from app.services.imports.models import RawCardRow
from app.services import scryfall
async def enrich_and_upsert(rows: list[RawCardRow], user_id: int, db: AsyncSession) -> int:
# Try set+collector first for exact matches
enriched = {}
name_lookup = []
for row in rows:
if row.set_code and row.collector_number:
card = await scryfall.get_card_by_set_and_collector(row.set_code, row.collector_number)
if card:
enriched[row.card_name.lower()] = card
continue
name_lookup.append(row.card_name)
# Batch enrich remaining by name
if name_lookup:
name_map = await scryfall.batch_enrich_by_name(name_lookup)
enriched.update(name_map)
count = 0
for row in rows:
sf_data = enriched.get(row.card_name.lower())
if not sf_data:
continue
scryfall_id = sf_data["id"]
# Upsert — add quantities if card already exists
result = await db.execute(
select(CollectionCard).where(
CollectionCard.owner_id == user_id,
CollectionCard.scryfall_id == scryfall_id,
)
)
existing = result.scalar_one_or_none()
if existing:
existing.quantity += row.quantity
existing.foil_quantity += row.foil_quantity
existing.scryfall_data = sf_data
else:
db.add(CollectionCard(
owner_id=user_id,
card_name=sf_data.get("name", row.card_name),
set_code=sf_data.get("set", row.set_code),
collector_number=sf_data.get("collector_number", row.collector_number),
quantity=row.quantity,
foil_quantity=row.foil_quantity,
scryfall_id=scryfall_id,
scryfall_data=sf_data,
))
count += 1
await db.commit()
return count
+99
View File
@@ -0,0 +1,99 @@
"""
Manabox collection importer.
Manabox exports a single CSV format:
Name, Set code, Collector number, Foil, Quantity, ...
The "Foil" column is a boolean ("Yes"/"No" or "1"/"0") applied per-row.
Manabox creates separate rows for foil and non-foil copies of the same card,
so we merge them into a single RawCardRow (quantity + foil_quantity).
"""
import csv
import io
from collections import defaultdict
import chardet
from app.services.imports.models import RawCardRow
SOURCE = "manabox"
_NAME_COLS = {"name", "card name", "cardname", "card_name"}
_SET_COLS = {"set code", "set", "setcode", "set_code", "edition code"}
_COLLECTOR_COLS = {"collector number", "collector #", "collectornumber", "number"}
_FOIL_COLS = {"foil", "is foil", "isfoil"}
_QTY_COLS = {"quantity", "qty", "count", "amount"}
def parse(raw_bytes: bytes) -> list[RawCardRow]:
detected = chardet.detect(raw_bytes)
encoding = detected.get("encoding") or "utf-8"
text = raw_bytes.decode(encoding, errors="replace").strip()
reader = csv.DictReader(io.StringIO(text))
if not reader.fieldnames:
raise ValueError("Manabox CSV appears to be empty or has no header row")
headers = {h.strip().lower(): h for h in reader.fieldnames if h}
name_col = _find_col(headers, _NAME_COLS, "Name")
set_col = _find_col(headers, _SET_COLS, None, required=False)
collect_col = _find_col(headers, _COLLECTOR_COLS, None, required=False)
foil_col = _find_col(headers, _FOIL_COLS, None, required=False)
qty_col = _find_col(headers, _QTY_COLS, "Quantity")
# Key: (card_name, set_code, collector_number) → merged row
merged: dict[tuple, RawCardRow] = defaultdict(lambda: RawCardRow(import_source=SOURCE))
for row in reader:
name = row.get(name_col, "").strip()
if not name:
continue
set_code = (row.get(set_col, "") or "").strip().lower() if set_col else ""
collector = (row.get(collect_col, "") or "").strip() if collect_col else ""
key = (name.lower(), set_code, collector)
try:
qty = int(float(row.get(qty_col, "1") or "1"))
except (ValueError, TypeError):
qty = 1
is_foil = _parse_bool(row.get(foil_col, "")) if foil_col else False
entry = merged[key]
entry.card_name = name
entry.set_code = set_code
entry.collector_number = collector
if is_foil:
entry.foil_quantity += qty
else:
entry.quantity += qty
if not merged:
raise ValueError("Manabox CSV contained no card rows")
return list(merged.values())
def _parse_bool(value: str) -> bool:
return str(value).strip().lower() in {"yes", "true", "1", "y"}
def _find_col(
headers: dict[str, str],
aliases: set[str],
fallback: str | None,
required: bool = True,
) -> str | None:
for alias in aliases:
if alias in headers:
return headers[alias]
if fallback and fallback.lower() in headers:
return headers[fallback.lower()]
if required:
raise ValueError(
f"Could not find required column. Expected one of: {sorted(aliases)}"
)
return None
+12
View File
@@ -0,0 +1,12 @@
from dataclasses import dataclass, field
@dataclass
class RawCardRow:
card_name: str = ""
set_code: str = ""
collector_number: str = ""
quantity: int = 0
foil_quantity: int = 0
import_source: str = ""
raw: dict = field(default_factory=dict)
+187
View File
@@ -0,0 +1,187 @@
"""
Scryfall API client with Redis caching.
Respects Scryfall's rate limit (10 req/s with a small safety margin).
Cards are cached for 24 hours by default (SCRYFALL_CACHE_TTL_SECONDS).
"""
import asyncio
import json
from typing import Optional
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
from app.core.config import settings
from app.core.redis import get_redis
SCRYFALL_BASE = "https://api.scryfall.com"
_semaphore: asyncio.Semaphore | None = None
def _get_semaphore() -> asyncio.Semaphore:
global _semaphore
if _semaphore is None:
_semaphore = asyncio.Semaphore(settings.SCRYFALL_RATE_LIMIT_RPS)
return _semaphore
def _cache_key(kind: str, value: str) -> str:
return f"scryfall:{kind}:{value.lower()}"
async def _cache_get(key: str) -> Optional[dict]:
redis = await get_redis()
raw = await redis.get(key)
return json.loads(raw) if raw else None
async def _cache_set(key: str, data: dict) -> None:
redis = await get_redis()
await redis.set(key, json.dumps(data), ex=settings.SCRYFALL_CACHE_TTL_SECONDS)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=4))
async def _get(path: str, params: dict | None = None) -> dict:
async with _get_semaphore():
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{SCRYFALL_BASE}{path}",
params=params,
headers={"User-Agent": "MTGDeckBuilder/1.0"},
)
resp.raise_for_status()
return resp.json()
async def get_card_by_name(name: str, exact: bool = True) -> Optional[dict]:
"""Fetch a card by name. Uses fuzzy search if exact=False."""
key = _cache_key("name", name)
cached = await _cache_get(key)
if cached:
return cached
try:
if exact:
data = await _get("/cards/named", {"exact": name})
else:
data = await _get("/cards/named", {"fuzzy": name})
await _cache_set(key, data)
# Also cache by scryfall id
await _cache_set(_cache_key("id", data["id"]), data)
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
raise
async def get_card_by_id(scryfall_id: str) -> Optional[dict]:
"""Fetch a card by its Scryfall UUID."""
key = _cache_key("id", scryfall_id)
cached = await _cache_get(key)
if cached:
return cached
try:
data = await _get(f"/cards/{scryfall_id}")
await _cache_set(key, data)
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
raise
async def get_card_by_set_and_collector(set_code: str, collector_number: str) -> Optional[dict]:
"""Fetch a card by set + collector number (used during import enrichment)."""
key = _cache_key("setcol", f"{set_code}:{collector_number}")
cached = await _cache_get(key)
if cached:
return cached
try:
data = await _get(f"/cards/{set_code.lower()}/{collector_number}")
await _cache_set(key, data)
await _cache_set(_cache_key("id", data["id"]), data)
return data
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
raise
async def batch_enrich_by_name(names: list[str]) -> dict[str, Optional[dict]]:
"""
Resolve up to 75 card names at once via Scryfall's /cards/collection endpoint.
Returns a dict of {name_lower: scryfall_data}.
Falls back to None for cards not found.
"""
results: dict[str, Optional[dict]] = {}
uncached: list[str] = []
# Check cache first
for name in names:
key = _cache_key("name", name)
cached = await _cache_get(key)
if cached:
results[name.lower()] = cached
else:
uncached.append(name)
# Batch fetch uncached in chunks of 75 (Scryfall limit)
for chunk_start in range(0, len(uncached), 75):
chunk = uncached[chunk_start : chunk_start + 75]
identifiers = [{"name": n} for n in chunk]
async with _get_semaphore():
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
f"{SCRYFALL_BASE}/cards/collection",
json={"identifiers": identifiers},
headers={"User-Agent": "MTGDeckBuilder/1.0"},
)
resp.raise_for_status()
data = resp.json()
for card in data.get("data", []):
name_lower = card["name"].lower()
results[name_lower] = card
await _cache_set(_cache_key("name", card["name"]), card)
await _cache_set(_cache_key("id", card["id"]), card)
# Mark not-found as None
found_lower = {c["name"].lower() for c in data.get("data", [])}
for name in chunk:
if name.lower() not in found_lower:
results[name.lower()] = None
# Be polite between chunks
if chunk_start + 75 < len(uncached):
await asyncio.sleep(0.15)
return results
def extract_price_usd(scryfall_data: dict) -> Optional[float]:
"""Pull the cheapest non-foil USD price from a Scryfall card object."""
try:
price = scryfall_data.get("prices", {}).get("usd")
return float(price) if price else None
except (TypeError, ValueError):
return None
def card_image_url(scryfall_data: dict, size: str = "normal") -> Optional[str]:
"""Return an image URL. Handles split/transform/MDFCs gracefully."""
images = scryfall_data.get("image_uris")
if images:
return images.get(size)
# Double-faced cards store images per face
faces = scryfall_data.get("card_faces", [])
if faces:
return faces[0].get("image_uris", {}).get(size)
return None
def is_commander_legal(scryfall_data: dict) -> bool:
return scryfall_data.get("legalities", {}).get("commander") == "legal"