import os import io import csv import time import threading from datetime import datetime, timezone from typing import Dict, Optional, Tuple import gradio as gr from huggingface_hub import HfApi # ------------------------- # Config # ------------------------- DATASET_REPO_ID = os.environ.get("DATASET_REPO_ID") # Space secret HF_TOKEN = os.environ["HF_TOKEN"] # Space secret KEYS_PATH = "keys.csv" B_KEYS_PATH = "b_keys.csv" api = HfApi(token=HF_TOKEN) _lock = threading.Lock() # Simple cache to reduce Hub reads _cache = { "ts": 0.0, "keys": None, # type: ignore "b_keys": None, # type: ignore } CACHE_TTL_SEC = 10.0 # ------------------------- # Helpers # ------------------------- def _utc_now_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def _download_csv(repo_id: str, path_in_repo: str) -> str: # Download file bytes from the dataset repo, return text file_bytes = api.hf_hub_download( repo_id=repo_id, filename=path_in_repo, repo_type="dataset", token=HF_TOKEN, ) with open(file_bytes, "r", encoding="utf-8") as f: return f.read() def _upload_csv(repo_id: str, path_in_repo: str, content: str, commit_message: str) -> None: api.upload_file( path_or_fileobj=io.BytesIO(content.encode("utf-8")), path_in_repo=path_in_repo, repo_id=repo_id, repo_type="dataset", commit_message=commit_message, token=HF_TOKEN, ) def _parse_keys(csv_text: str) -> list: keys = [] reader = csv.DictReader(io.StringIO(csv_text)) for r in reader: if r["key"].strip(): keys.append({ "key": r["key"].strip(), "sent": (r.get("sent") or "").strip(), }) return keys def _parse_b_keys(csv_text: str) -> list: keys = [] reader = csv.DictReader(io.StringIO(csv_text)) for r in reader: if r["key"].strip(): keys.append({ "key": r["key"].strip(), "count": int(r.get("count") or "0"), }) return keys def _serialize_keys(keys: list) -> str: out = io.StringIO() fieldnames = ["key", "sent"] w = csv.DictWriter(out, fieldnames=fieldnames) w.writeheader() for k in keys: w.writerow({ "key": k["key"], "sent": k.get("sent", ""), }) return out.getvalue() def _serialize_b_keys(keys: list) -> str: out = io.StringIO() fieldnames = ["key", "count"] w = csv.DictWriter(out, fieldnames=fieldnames) w.writeheader() for k in keys: w.writerow({ "key": k["key"], "count": k.get("count", 0), }) return out.getvalue() def load_state(force: bool = False): now = time.time() if (not force) and _cache["keys"] is not None and (now - _cache["ts"] < CACHE_TTL_SEC): return _cache["keys"], _cache["b_keys"] keys_csv = _download_csv(DATASET_REPO_ID, KEYS_PATH) b_keys_csv = _download_csv(DATASET_REPO_ID, B_KEYS_PATH) keys = _parse_keys(keys_csv) b_keys = _parse_b_keys(b_keys_csv) _cache["ts"] = now _cache["keys"] = keys _cache["b_keys"] = b_keys return keys, b_keys # ------------------------- # Core API # ------------------------- def claim_key( request: Optional[gr.Request] = None, ) -> Tuple[str, str]: """ Returns the next available key. Returns (key, status_message) """ _ = request with _lock: keys, _ = load_state(force=True) # Find available keys available_keys = [k for k in keys if not k["sent"]] if len(available_keys) < 1: return "", "No available keys remaining." # Send the next available key key_to_send = available_keys[0] # Mark key as sent key_to_send["sent"] = _utc_now_iso() # Save changes updated_keys_csv = _serialize_keys(keys) _upload_csv( DATASET_REPO_ID, KEYS_PATH, updated_keys_csv, commit_message=f"Key sent at {_utc_now_iso()}", ) # refresh cache immediately _cache["ts"] = 0.0 return key_to_send["key"], "Key sent successfully." def claim_b_key( request: Optional[gr.Request] = None, ) -> Tuple[str, str]: """ Returns the key and increments the usage count. Returns (b_key, status_message) """ _ = request with _lock: keys, b_keys = load_state(force=True) if not b_keys or len(b_keys) == 0: return "", "No keys available." # Get the first key (you can add logic to rotate if needed) b_key = b_keys[0] # Increment the count b_key["count"] += 1 # Save changes updated_b_keys_csv = _serialize_b_keys(b_keys) _upload_csv( DATASET_REPO_ID, B_KEYS_PATH, updated_b_keys_csv, commit_message=f"Key requested at {_utc_now_iso()}", ) # refresh cache immediately _cache["ts"] = 0.0 return b_key["key"], f"Key provided." # ------------------------- # UI # ------------------------- with gr.Blocks(title="API") as demo: gr.Markdown("## API Service") with gr.Tab("Service A"): btn_a = gr.Button("Request", variant="secondary", size="sm") out_a = gr.Textbox(label="Response", interactive=False, show_label=False) status_a = gr.Textbox(label="", interactive=False, show_label=False) btn_a.click( fn=claim_key, inputs=[], outputs=[out_a, status_a], api_name="claim_key", ) with gr.Tab("Service B"): btn_b = gr.Button("Request", variant="secondary", size="sm") out_b = gr.Textbox(label="Response", interactive=False, show_label=False) status_b = gr.Textbox(label="", interactive=False, show_label=False) btn_b.click( fn=claim_b_key, inputs=[], outputs=[out_b, status_b], api_name="claim_b_key", ) demo.queue() demo.launch()