190 lines
5.9 KiB
Python
190 lines
5.9 KiB
Python
"""
|
|
Claude API client for deck generation.
|
|
|
|
Responsibilities:
|
|
1. Call the Anthropic API with the appropriate prompt
|
|
2. Parse and validate the JSON response
|
|
3. Strip [UNOWNED] markers and track ownership
|
|
4. Validate card names against Scryfall (batch)
|
|
5. Return a structured DeckPayload ready to persist
|
|
"""
|
|
import json
|
|
import re
|
|
from typing import Optional
|
|
|
|
import anthropic
|
|
|
|
from app.core.config import settings
|
|
from app.models.deck import CardSlot
|
|
from app.services import scryfall
|
|
|
|
_client: anthropic.AsyncAnthropic | None = None
|
|
|
|
|
|
def _get_client() -> anthropic.AsyncAnthropic:
|
|
global _client
|
|
if _client is None:
|
|
_client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
|
|
return _client
|
|
|
|
|
|
async def call_claude(
|
|
system_prompt: str,
|
|
user_message: str,
|
|
max_tokens: int = 8000,
|
|
) -> "DeckPayload":
|
|
client = _get_client()
|
|
message = await client.messages.create(
|
|
model=settings.ANTHROPIC_MODEL,
|
|
max_tokens=max_tokens,
|
|
system=system_prompt,
|
|
messages=[{"role": "user", "content": user_message}],
|
|
)
|
|
raw_text = _extract_text(message)
|
|
deck_json = _parse_json(raw_text)
|
|
payload = _build_payload(deck_json)
|
|
await _enrich_with_scryfall(payload)
|
|
return payload
|
|
|
|
|
|
UNOWNED_RE = re.compile(r"\s*\[UNOWNED\]\s*$", re.IGNORECASE)
|
|
|
|
|
|
def _extract_text(message) -> str:
|
|
for block in message.content:
|
|
if block.type == "text":
|
|
return block.text.strip()
|
|
raise ValueError("Claude returned no text content")
|
|
|
|
|
|
def _parse_json(text: str) -> dict:
|
|
# Strip markdown fences
|
|
text = re.sub(r"^```(?:json)?\s*", "", text, flags=re.MULTILINE)
|
|
text = re.sub(r"\s*```$", "", text, flags=re.MULTILINE)
|
|
text = text.strip()
|
|
|
|
# First attempt: standard parse
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Second attempt: try with json5/demjson-style fixes
|
|
# Fix unescaped apostrophes inside JSON strings heuristically
|
|
# Replace 's that appear mid-word inside a string context
|
|
fixed = re.sub(r"(?<=[a-zA-Z])'(?=[a-zA-Z])", "\\'", text)
|
|
try:
|
|
return json.loads(fixed)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Third attempt: extract just the cards array if top-level parse fails
|
|
# This handles truncated responses
|
|
cards_match = re.search(r'"cards"\s*:\s*(\[.*?\])\s*[,}]', text, re.DOTALL)
|
|
decklist_match = re.search(r'"decklist"\s*:\s*(\[.*?\])\s*[,}]', text, re.DOTALL)
|
|
deck_match = re.search(r'"deck"\s*:\s*(\[.*?\])\s*[,}]', text, re.DOTALL)
|
|
|
|
cards_text = None
|
|
for match in [cards_match, decklist_match, deck_match]:
|
|
if match:
|
|
cards_text = match.group(1)
|
|
break
|
|
|
|
if cards_text:
|
|
try:
|
|
cards = json.loads(cards_text)
|
|
return {"deck_name": "Generated Deck", "strategy_summary": "", "cards": cards}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Last resort: raise with original error
|
|
try:
|
|
json.loads(text)
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Claude response was not valid JSON: {e}\n\nRaw: {text[:500]}")
|
|
|
|
|
|
def _parse_slot(raw: str) -> CardSlot:
|
|
normalised = raw.strip().lower()
|
|
try:
|
|
return CardSlot(normalised)
|
|
except ValueError:
|
|
mapping = {
|
|
"lands": CardSlot.LAND,
|
|
"creatures": CardSlot.CREATURE,
|
|
"instants": CardSlot.INSTANT,
|
|
"sorceries": CardSlot.SORCERY,
|
|
"artifacts": CardSlot.ARTIFACT,
|
|
"enchantments": CardSlot.ENCHANTMENT,
|
|
"planeswalkers": CardSlot.PLANESWALKER,
|
|
}
|
|
return mapping.get(normalised, CardSlot.CREATURE)
|
|
|
|
|
|
class CardEntry:
|
|
def __init__(self, raw: dict):
|
|
raw_name: str = raw.get("name", "").strip()
|
|
self.is_owned: bool = not bool(UNOWNED_RE.search(raw_name))
|
|
self.card_name: str = UNOWNED_RE.sub("", raw_name).strip()
|
|
self.slot: CardSlot = _parse_slot(raw.get("slot", "creature"))
|
|
self.quantity: int = max(1, int(raw.get("quantity", 1)))
|
|
self.reasoning: Optional[str] = raw.get("reasoning")
|
|
self.scryfall_id: str = ""
|
|
self.scryfall_data: dict = {}
|
|
|
|
|
|
class CutEntry:
|
|
def __init__(self, raw: dict):
|
|
self.card_name: str = raw.get("name", "").strip()
|
|
self.reasoning: Optional[str] = raw.get("reasoning")
|
|
|
|
|
|
class DeckPayload:
|
|
def __init__(self):
|
|
self.deck_name: str = ""
|
|
self.strategy_summary: str = ""
|
|
self.cards: list[CardEntry] = []
|
|
self.cuts: list[CutEntry] = []
|
|
self.unresolved: list[str] = []
|
|
|
|
|
|
def _build_payload(data: dict) -> DeckPayload:
|
|
payload = DeckPayload()
|
|
payload.deck_name = data.get("deck_name", "Untitled Deck")
|
|
payload.strategy_summary = data.get("strategy_summary", "")
|
|
|
|
# Handle both 'cards' and alternative keys Claude sometimes returns
|
|
cards_list = (
|
|
data.get("cards")
|
|
or data.get("decklist")
|
|
or data.get("deck")
|
|
or []
|
|
)
|
|
|
|
for raw_card in cards_list:
|
|
if raw_card.get("name"):
|
|
payload.cards.append(CardEntry(raw_card))
|
|
for raw_cut in data.get("cuts", []):
|
|
if raw_cut.get("name"):
|
|
payload.cuts.append(CutEntry(raw_cut))
|
|
return payload
|
|
|
|
|
|
async def _enrich_with_scryfall(payload: DeckPayload) -> None:
|
|
names = [c.card_name for c in payload.cards]
|
|
name_map = await scryfall.batch_enrich_by_name(names)
|
|
|
|
for entry in payload.cards:
|
|
sf_data = name_map.get(entry.card_name.lower())
|
|
if sf_data:
|
|
entry.scryfall_id = sf_data["id"]
|
|
entry.scryfall_data = sf_data
|
|
else:
|
|
sf_data = await scryfall.get_card_by_name(entry.card_name, exact=False)
|
|
if sf_data:
|
|
entry.scryfall_id = sf_data["id"]
|
|
entry.scryfall_data = sf_data
|
|
entry.card_name = sf_data["name"]
|
|
else:
|
|
payload.unresolved.append(entry.card_name)
|