File size: 6,956 Bytes
b66240d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
from __future__ import annotations
import os, time, random
from typing import Dict, Any, List, Literal, Optional
import httpx
HF_API_MODELS = "https://huggingface.co/api/models"
HF_API_DATASETS = "https://huggingface.co/api/datasets"
REFRESH_INTERVAL_SEC = int(os.getenv("HF_REGISTRY_REFRESH_SEC", "21600"))
HTTP_TIMEOUT = float(os.getenv("HF_HTTP_TIMEOUT", "8.0"))
HF_MODE = os.getenv("HF_MODE", "off").lower()
if HF_MODE not in ("off", "public", "auth"):
HF_MODE = "off"
HF_TOKEN = None
if HF_MODE == "auth":
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
HF_MODE = "off"
# Curated Crypto Datasets
CRYPTO_DATASETS = {
"price": [
"paperswithbacktest/Cryptocurrencies-Daily-Price",
"linxy/CryptoCoin",
"sebdg/crypto_data",
"Farmaanaa/bitcoin_price_timeseries",
"WinkingFace/CryptoLM-Bitcoin-BTC-USDT",
"WinkingFace/CryptoLM-Ethereum-ETH-USDT",
"WinkingFace/CryptoLM-Ripple-XRP-USDT",
],
"news_raw": [
"flowfree/crypto-news-headlines",
"edaschau/bitcoin_news",
],
"news_labeled": [
"SahandNZ/cryptonews-articles-with-price-momentum-labels",
"tahamajs/bitcoin-individual-news-dataset",
"tahamajs/bitcoin-enhanced-prediction-dataset-with-comprehensive-news",
"tahamajs/bitcoin-prediction-dataset-with-local-news-summaries",
"arad1367/Crypto_Semantic_News",
]
}
_SEED_MODELS = ["ElKulako/cryptobert", "kk08/CryptoBERT"]
_SEED_DATASETS = []
for cat in CRYPTO_DATASETS.values():
_SEED_DATASETS.extend(cat)
class HFRegistry:
def __init__(self):
self.models: Dict[str, Dict[str, Any]] = {}
self.datasets: Dict[str, Dict[str, Any]] = {}
self.last_refresh = 0.0
self.fail_reason: Optional[str] = None
async def _hf_json(self, url: str, params: Dict[str, Any]) -> Any:
headers = {}
if HF_MODE == "auth" and HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, headers=headers) as client:
r = await client.get(url, params=params)
r.raise_for_status()
return r.json()
async def refresh(self) -> Dict[str, Any]:
if HF_MODE == "off":
self.fail_reason = "HF_MODE=off"
return {"ok": False, "error": "HF_MODE=off", "models": 0, "datasets": 0}
try:
for name in _SEED_MODELS:
self.models.setdefault(name, {"id": name, "source": "seed", "pipeline_tag": "sentiment-analysis"})
for category, dataset_list in CRYPTO_DATASETS.items():
for name in dataset_list:
self.datasets.setdefault(name, {"id": name, "source": "seed", "category": category, "tags": ["crypto", category]})
if HF_MODE in ("public", "auth"):
try:
q_sent = {"pipeline_tag": "sentiment-analysis", "search": "crypto", "limit": 50}
models = await self._hf_json(HF_API_MODELS, q_sent)
for m in models or []:
mid = m.get("modelId") or m.get("id") or m.get("name")
if not mid: continue
self.models[mid] = {
"id": mid,
"pipeline_tag": m.get("pipeline_tag"),
"likes": m.get("likes"),
"downloads": m.get("downloads"),
"tags": m.get("tags") or [],
"source": "hub"
}
q_crypto = {"search": "crypto", "limit": 100}
datasets = await self._hf_json(HF_API_DATASETS, q_crypto)
for d in datasets or []:
did = d.get("id") or d.get("name")
if not did: continue
category = "other"
tags_str = " ".join(d.get("tags") or []).lower()
name_lower = did.lower()
if "price" in tags_str or "ohlc" in tags_str or "price" in name_lower:
category = "price"
elif "news" in tags_str or "news" in name_lower:
if "label" in tags_str or "sentiment" in tags_str:
category = "news_labeled"
else:
category = "news_raw"
self.datasets[did] = {
"id": did,
"likes": d.get("likes"),
"downloads": d.get("downloads"),
"tags": d.get("tags") or [],
"category": category,
"source": "hub"
}
except Exception as e:
error_msg = str(e)[:200]
if "401" in error_msg or "unauthorized" in error_msg.lower():
self.fail_reason = "Authentication failed"
else:
self.fail_reason = error_msg
self.last_refresh = time.time()
if self.fail_reason is None:
return {"ok": True, "models": len(self.models), "datasets": len(self.datasets)}
return {"ok": False, "error": self.fail_reason, "models": len(self.models), "datasets": len(self.datasets)}
except Exception as e:
self.fail_reason = str(e)[:200]
return {"ok": False, "error": self.fail_reason, "models": len(self.models), "datasets": len(self.datasets)}
def list(self, kind: Literal["models","datasets"]="models", category: Optional[str]=None) -> List[Dict[str, Any]]:
items = list(self.models.values()) if kind == "models" else list(self.datasets.values())
if category and kind == "datasets":
items = [d for d in items if d.get("category") == category]
return items
def health(self):
age = time.time() - (self.last_refresh or 0)
return {
"ok": self.last_refresh > 0 and (self.fail_reason is None),
"last_refresh_epoch": self.last_refresh,
"age_sec": age,
"fail_reason": self.fail_reason,
"counts": {"models": len(self.models), "datasets": len(self.datasets)},
"interval_sec": REFRESH_INTERVAL_SEC
}
REGISTRY = HFRegistry()
async def periodic_refresh(loop_sleep: int = REFRESH_INTERVAL_SEC):
await REGISTRY.refresh()
await _sleep(int(loop_sleep * random.uniform(0.5, 0.9)))
while True:
await REGISTRY.refresh()
await _sleep(loop_sleep)
async def _sleep(sec: int):
import asyncio
try:
await asyncio.sleep(sec)
except: pass
|