| import os | |
| import json | |
| import logging | |
| from typing import List, Dict, Any | |
| import numpy as np | |
| logger = logging.getLogger("plutus.recommender") | |
| logging.basicConfig(level=logging.INFO) | |
| _EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2") | |
| _CACHE_DIR = os.getenv("HF_HOME", "/home/user/app") | |
| _INDEX_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_index.faiss") | |
| _META_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_meta.json") | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| except Exception: | |
| logger.warning(" sentence-transformers or faiss not installed. Ensure both are in requirements.txt") | |
| class Recommender: | |
| """ | |
| Embedding-based semantic recommender for Plutus topics. | |
| Loads resources from recommend.json, builds a FAISS index for fast similarity search. | |
| """ | |
| def __init__( | |
| self, | |
| recommend_json_path: str, | |
| emb_model_name: str = _EMB_MODEL_NAME, | |
| index_path: str = _INDEX_FILE, | |
| meta_path: str = _META_FILE, | |
| ): | |
| self.recommend_json_path = recommend_json_path | |
| self.emb_model_name = emb_model_name | |
| self.index_path = index_path | |
| self.meta_path = meta_path | |
| self.model = None | |
| self.index = None | |
| self.meta: List[Dict[str, Any]] = [] | |
| self.topics_map: Dict[str, Any] = {} | |
| self._load_json() | |
| self._maybe_init_embedding_model() | |
| if os.path.exists(self.index_path) and os.path.exists(self.meta_path): | |
| try: | |
| self._load_index() | |
| except Exception: | |
| logger.exception("Index load failed β will rebuild on demand.") | |
| else: | |
| logger.info("No index found β will build when first used.") | |
| def _load_json(self): | |
| """Load recommend.json file.""" | |
| if not os.path.exists(self.recommend_json_path): | |
| raise FileNotFoundError(f"recommend.json not found at: {self.recommend_json_path}") | |
| with open(self.recommend_json_path, "r", encoding="utf-8") as f: | |
| self.topics_map = json.load(f) | |
| logger.info(f"Loaded recommend.json with {len(self.topics_map)} topics.") | |
| def _maybe_init_embedding_model(self): | |
| if self.model is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| self.model = SentenceTransformer(self.emb_model_name) | |
| logger.info(f"Loaded embedding model: {self.emb_model_name}") | |
| except Exception as e: | |
| logger.exception(f" Failed to load embedding model: {e}") | |
| raise RuntimeError("Embedding model not available. Please check dependencies.") | |
| def build_index(self, force: bool = False): | |
| """ | |
| Builds FAISS index from recommend.json. | |
| Each document and video becomes a searchable vector. | |
| Automatically saves the index and metadata to disk. | |
| """ | |
| if self.index is not None and not force: | |
| logger.info("Index already built; skipping rebuild.") | |
| return | |
| items = [] | |
| texts = [] | |
| for topic, val in self.topics_map.items(): | |
| for d in val.get("docs", []): | |
| items.append({"topic": topic, "type": "doc", "url": d}) | |
| texts.append(f"{topic} doc {d}") | |
| for v in val.get("videos", []): | |
| items.append({"topic": topic, "type": "video", "url": v}) | |
| texts.append(f"{topic} video {v}") | |
| if not texts: | |
| raise ValueError("No docs/videos found in recommend.json to index.") | |
| logger.info(f"Encoding {len(texts)} recommendation entries...") | |
| emb = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False) | |
| faiss.normalize_L2(emb) | |
| d = emb.shape[1] | |
| try: | |
| index = faiss.IndexFlatIP(d) | |
| index.add(emb) | |
| self.index = index | |
| self.meta = items | |
| try: | |
| faiss.write_index(self.index, self.index_path) | |
| with open(self.meta_path, "w", encoding="utf-8") as f: | |
| json.dump(self.meta, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Saved FAISS index and metadata ({len(items)} items).") | |
| except Exception: | |
| logger.warning(" Could not persist index β running in memory only (likely Hugging Face Space).") | |
| except Exception as e: | |
| logger.exception(f" Failed to build FAISS index: {e}") | |
| raise RuntimeError(f"Index build failed: {e}") | |
| def _load_index(self): | |
| """Loads index and metadata files.""" | |
| import faiss | |
| self.index = faiss.read_index(self.index_path) | |
| with open(self.meta_path, "r", encoding="utf-8") as f: | |
| self.meta = json.load(f) | |
| logger.info(f"Loaded FAISS index with {len(self.meta)} entries.") | |
| def recommend_for_query(self, query: str, top_k: int = 5, topic_boost: str = None) -> List[Dict[str, Any]]: | |
| """ | |
| Returns top_k recommended items for `query`. | |
| Uses cosine similarity (via normalized inner product). | |
| """ | |
| if self.index is None: | |
| logger.info("Index not found in memory β building now.") | |
| self.build_index() | |
| q_emb = self.model.encode([query], convert_to_numpy=True) | |
| import faiss | |
| faiss.normalize_L2(q_emb) | |
| D, I = self.index.search(q_emb, top_k * 3) | |
| results = [] | |
| seen = set() | |
| for score, idx in zip(D[0], I[0]): | |
| if idx < 0: | |
| continue | |
| meta = self.meta[idx] | |
| key = (meta.get("url"), meta.get("type")) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| results.append({ | |
| "topic": meta.get("topic"), | |
| "type": meta.get("type"), | |
| "url": meta.get("url"), | |
| "score": float(score), | |
| }) | |
| if len(results) >= top_k: | |
| break | |
| if topic_boost: | |
| results.sort( | |
| key=lambda x: (0 if x["topic"].lower() == topic_boost.lower() else 1, -x["score"]) | |
| ) | |
| else: | |
| results.sort(key=lambda x: -x["score"]) | |
| logger.info(f"Recommended {len(results)} items for query: '{query}'") | |
| return results | |