import os import json import logging from typing import List, Dict, Any import numpy as np logger = logging.getLogger("plutus.recommender") logging.basicConfig(level=logging.INFO) _EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2") _CACHE_DIR = os.getenv("HF_HOME", "/home/user/app") _INDEX_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_index.faiss") _META_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_meta.json") try: from sentence_transformers import SentenceTransformer import faiss except Exception: logger.warning(" sentence-transformers or faiss not installed. Ensure both are in requirements.txt") class Recommender: """ Embedding-based semantic recommender for Plutus topics. Loads resources from recommend.json, builds a FAISS index for fast similarity search. """ def __init__( self, recommend_json_path: str, emb_model_name: str = _EMB_MODEL_NAME, index_path: str = _INDEX_FILE, meta_path: str = _META_FILE, ): self.recommend_json_path = recommend_json_path self.emb_model_name = emb_model_name self.index_path = index_path self.meta_path = meta_path self.model = None self.index = None self.meta: List[Dict[str, Any]] = [] self.topics_map: Dict[str, Any] = {} self._load_json() self._maybe_init_embedding_model() if os.path.exists(self.index_path) and os.path.exists(self.meta_path): try: self._load_index() except Exception: logger.exception("Index load failed — will rebuild on demand.") else: logger.info("No index found — will build when first used.") def _load_json(self): """Load recommend.json file.""" if not os.path.exists(self.recommend_json_path): raise FileNotFoundError(f"recommend.json not found at: {self.recommend_json_path}") with open(self.recommend_json_path, "r", encoding="utf-8") as f: self.topics_map = json.load(f) logger.info(f"Loaded recommend.json with {len(self.topics_map)} topics.") def _maybe_init_embedding_model(self): if self.model is None: try: from sentence_transformers import SentenceTransformer self.model = SentenceTransformer(self.emb_model_name) logger.info(f"Loaded embedding model: {self.emb_model_name}") except Exception as e: logger.exception(f" Failed to load embedding model: {e}") raise RuntimeError("Embedding model not available. Please check dependencies.") def build_index(self, force: bool = False): """ Builds FAISS index from recommend.json. Each document and video becomes a searchable vector. Automatically saves the index and metadata to disk. """ if self.index is not None and not force: logger.info("Index already built; skipping rebuild.") return items = [] texts = [] for topic, val in self.topics_map.items(): for d in val.get("docs", []): items.append({"topic": topic, "type": "doc", "url": d}) texts.append(f"{topic} doc {d}") for v in val.get("videos", []): items.append({"topic": topic, "type": "video", "url": v}) texts.append(f"{topic} video {v}") if not texts: raise ValueError("No docs/videos found in recommend.json to index.") logger.info(f"Encoding {len(texts)} recommendation entries...") emb = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False) faiss.normalize_L2(emb) d = emb.shape[1] try: index = faiss.IndexFlatIP(d) index.add(emb) self.index = index self.meta = items try: faiss.write_index(self.index, self.index_path) with open(self.meta_path, "w", encoding="utf-8") as f: json.dump(self.meta, f, ensure_ascii=False, indent=2) logger.info(f"Saved FAISS index and metadata ({len(items)} items).") except Exception: logger.warning(" Could not persist index — running in memory only (likely Hugging Face Space).") except Exception as e: logger.exception(f" Failed to build FAISS index: {e}") raise RuntimeError(f"Index build failed: {e}") def _load_index(self): """Loads index and metadata files.""" import faiss self.index = faiss.read_index(self.index_path) with open(self.meta_path, "r", encoding="utf-8") as f: self.meta = json.load(f) logger.info(f"Loaded FAISS index with {len(self.meta)} entries.") def recommend_for_query(self, query: str, top_k: int = 5, topic_boost: str = None) -> List[Dict[str, Any]]: """ Returns top_k recommended items for `query`. Uses cosine similarity (via normalized inner product). """ if self.index is None: logger.info("Index not found in memory — building now.") self.build_index() q_emb = self.model.encode([query], convert_to_numpy=True) import faiss faiss.normalize_L2(q_emb) D, I = self.index.search(q_emb, top_k * 3) results = [] seen = set() for score, idx in zip(D[0], I[0]): if idx < 0: continue meta = self.meta[idx] key = (meta.get("url"), meta.get("type")) if key in seen: continue seen.add(key) results.append({ "topic": meta.get("topic"), "type": meta.get("type"), "url": meta.get("url"), "score": float(score), }) if len(results) >= top_k: break if topic_boost: results.sort( key=lambda x: (0 if x["topic"].lower() == topic_boost.lower() else 1, -x["score"]) ) else: results.sort(key=lambda x: -x["score"]) logger.info(f"Recommended {len(results)} items for query: '{query}'") return results