advanced_plutus_ai / app /recommender.py
Remostart's picture
Initial commit
815b4f4
import os
import json
import logging
from typing import List, Dict, Any
import numpy as np
logger = logging.getLogger("plutus.recommender")
logging.basicConfig(level=logging.INFO)
_EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")
_CACHE_DIR = os.getenv("HF_HOME", "/home/user/app")
_INDEX_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_index.faiss")
_META_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_meta.json")
try:
from sentence_transformers import SentenceTransformer
import faiss
except Exception:
logger.warning(" sentence-transformers or faiss not installed. Ensure both are in requirements.txt")
class Recommender:
"""
Embedding-based semantic recommender for Plutus topics.
Loads resources from recommend.json, builds a FAISS index for fast similarity search.
"""
def __init__(
self,
recommend_json_path: str,
emb_model_name: str = _EMB_MODEL_NAME,
index_path: str = _INDEX_FILE,
meta_path: str = _META_FILE,
):
self.recommend_json_path = recommend_json_path
self.emb_model_name = emb_model_name
self.index_path = index_path
self.meta_path = meta_path
self.model = None
self.index = None
self.meta: List[Dict[str, Any]] = []
self.topics_map: Dict[str, Any] = {}
self._load_json()
self._maybe_init_embedding_model()
if os.path.exists(self.index_path) and os.path.exists(self.meta_path):
try:
self._load_index()
except Exception:
logger.exception("Index load failed β€” will rebuild on demand.")
else:
logger.info("No index found β€” will build when first used.")
def _load_json(self):
"""Load recommend.json file."""
if not os.path.exists(self.recommend_json_path):
raise FileNotFoundError(f"recommend.json not found at: {self.recommend_json_path}")
with open(self.recommend_json_path, "r", encoding="utf-8") as f:
self.topics_map = json.load(f)
logger.info(f"Loaded recommend.json with {len(self.topics_map)} topics.")
def _maybe_init_embedding_model(self):
if self.model is None:
try:
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(self.emb_model_name)
logger.info(f"Loaded embedding model: {self.emb_model_name}")
except Exception as e:
logger.exception(f" Failed to load embedding model: {e}")
raise RuntimeError("Embedding model not available. Please check dependencies.")
def build_index(self, force: bool = False):
"""
Builds FAISS index from recommend.json.
Each document and video becomes a searchable vector.
Automatically saves the index and metadata to disk.
"""
if self.index is not None and not force:
logger.info("Index already built; skipping rebuild.")
return
items = []
texts = []
for topic, val in self.topics_map.items():
for d in val.get("docs", []):
items.append({"topic": topic, "type": "doc", "url": d})
texts.append(f"{topic} doc {d}")
for v in val.get("videos", []):
items.append({"topic": topic, "type": "video", "url": v})
texts.append(f"{topic} video {v}")
if not texts:
raise ValueError("No docs/videos found in recommend.json to index.")
logger.info(f"Encoding {len(texts)} recommendation entries...")
emb = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
faiss.normalize_L2(emb)
d = emb.shape[1]
try:
index = faiss.IndexFlatIP(d)
index.add(emb)
self.index = index
self.meta = items
try:
faiss.write_index(self.index, self.index_path)
with open(self.meta_path, "w", encoding="utf-8") as f:
json.dump(self.meta, f, ensure_ascii=False, indent=2)
logger.info(f"Saved FAISS index and metadata ({len(items)} items).")
except Exception:
logger.warning(" Could not persist index β€” running in memory only (likely Hugging Face Space).")
except Exception as e:
logger.exception(f" Failed to build FAISS index: {e}")
raise RuntimeError(f"Index build failed: {e}")
def _load_index(self):
"""Loads index and metadata files."""
import faiss
self.index = faiss.read_index(self.index_path)
with open(self.meta_path, "r", encoding="utf-8") as f:
self.meta = json.load(f)
logger.info(f"Loaded FAISS index with {len(self.meta)} entries.")
def recommend_for_query(self, query: str, top_k: int = 5, topic_boost: str = None) -> List[Dict[str, Any]]:
"""
Returns top_k recommended items for `query`.
Uses cosine similarity (via normalized inner product).
"""
if self.index is None:
logger.info("Index not found in memory β€” building now.")
self.build_index()
q_emb = self.model.encode([query], convert_to_numpy=True)
import faiss
faiss.normalize_L2(q_emb)
D, I = self.index.search(q_emb, top_k * 3)
results = []
seen = set()
for score, idx in zip(D[0], I[0]):
if idx < 0:
continue
meta = self.meta[idx]
key = (meta.get("url"), meta.get("type"))
if key in seen:
continue
seen.add(key)
results.append({
"topic": meta.get("topic"),
"type": meta.get("type"),
"url": meta.get("url"),
"score": float(score),
})
if len(results) >= top_k:
break
if topic_boost:
results.sort(
key=lambda x: (0 if x["topic"].lower() == topic_boost.lower() else 1, -x["score"])
)
else:
results.sort(key=lambda x: -x["score"])
logger.info(f"Recommended {len(results)} items for query: '{query}'")
return results