Spaces:
Running
Running
| import os | |
| import io | |
| import csv | |
| import time | |
| import threading | |
| from datetime import datetime, timezone | |
| from typing import Dict, Optional, Tuple | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| # ------------------------- | |
| # Config | |
| # ------------------------- | |
| DATASET_REPO_ID = os.environ.get("DATASET_REPO_ID") # Space secret | |
| HF_TOKEN = os.environ["HF_TOKEN"] # Space secret | |
| KEYS_PATH = "keys.csv" | |
| B_KEYS_PATH = "b_keys.csv" | |
| api = HfApi(token=HF_TOKEN) | |
| _lock = threading.Lock() | |
| # Simple cache to reduce Hub reads | |
| _cache = { | |
| "ts": 0.0, | |
| "keys": None, # type: ignore | |
| "b_keys": None, # type: ignore | |
| } | |
| CACHE_TTL_SEC = 10.0 | |
| # ------------------------- | |
| # Helpers | |
| # ------------------------- | |
| def _utc_now_iso() -> str: | |
| return datetime.now(timezone.utc).replace(microsecond=0).isoformat() | |
| def _download_csv(repo_id: str, path_in_repo: str) -> str: | |
| # Download file bytes from the dataset repo, return text | |
| file_bytes = api.hf_hub_download( | |
| repo_id=repo_id, | |
| filename=path_in_repo, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(file_bytes, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def _upload_csv(repo_id: str, path_in_repo: str, content: str, commit_message: str) -> None: | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(content.encode("utf-8")), | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, | |
| token=HF_TOKEN, | |
| ) | |
| def _parse_keys(csv_text: str) -> list: | |
| keys = [] | |
| reader = csv.DictReader(io.StringIO(csv_text)) | |
| for r in reader: | |
| if r["key"].strip(): | |
| keys.append({ | |
| "key": r["key"].strip(), | |
| "sent": (r.get("sent") or "").strip(), | |
| }) | |
| return keys | |
| def _parse_b_keys(csv_text: str) -> list: | |
| keys = [] | |
| reader = csv.DictReader(io.StringIO(csv_text)) | |
| for r in reader: | |
| if r["key"].strip(): | |
| keys.append({ | |
| "key": r["key"].strip(), | |
| "count": int(r.get("count") or "0"), | |
| }) | |
| return keys | |
| def _serialize_keys(keys: list) -> str: | |
| out = io.StringIO() | |
| fieldnames = ["key", "sent"] | |
| w = csv.DictWriter(out, fieldnames=fieldnames) | |
| w.writeheader() | |
| for k in keys: | |
| w.writerow({ | |
| "key": k["key"], | |
| "sent": k.get("sent", ""), | |
| }) | |
| return out.getvalue() | |
| def _serialize_b_keys(keys: list) -> str: | |
| out = io.StringIO() | |
| fieldnames = ["key", "count"] | |
| w = csv.DictWriter(out, fieldnames=fieldnames) | |
| w.writeheader() | |
| for k in keys: | |
| w.writerow({ | |
| "key": k["key"], | |
| "count": k.get("count", 0), | |
| }) | |
| return out.getvalue() | |
| def load_state(force: bool = False): | |
| now = time.time() | |
| if (not force) and _cache["keys"] is not None and (now - _cache["ts"] < CACHE_TTL_SEC): | |
| return _cache["keys"], _cache["b_keys"] | |
| keys_csv = _download_csv(DATASET_REPO_ID, KEYS_PATH) | |
| b_keys_csv = _download_csv(DATASET_REPO_ID, B_KEYS_PATH) | |
| keys = _parse_keys(keys_csv) | |
| b_keys = _parse_b_keys(b_keys_csv) | |
| _cache["ts"] = now | |
| _cache["keys"] = keys | |
| _cache["b_keys"] = b_keys | |
| return keys, b_keys | |
| # ------------------------- | |
| # Core API | |
| # ------------------------- | |
| def claim_key( | |
| request: Optional[gr.Request] = None, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Returns the next available key. | |
| Returns (key, status_message) | |
| """ | |
| _ = request | |
| with _lock: | |
| keys, _ = load_state(force=True) | |
| # Find available keys | |
| available_keys = [k for k in keys if not k["sent"]] | |
| if len(available_keys) < 1: | |
| return "", "No available keys remaining." | |
| # Send the next available key | |
| key_to_send = available_keys[0] | |
| # Mark key as sent | |
| key_to_send["sent"] = _utc_now_iso() | |
| # Save changes | |
| updated_keys_csv = _serialize_keys(keys) | |
| _upload_csv( | |
| DATASET_REPO_ID, | |
| KEYS_PATH, | |
| updated_keys_csv, | |
| commit_message=f"Key sent at {_utc_now_iso()}", | |
| ) | |
| # refresh cache immediately | |
| _cache["ts"] = 0.0 | |
| return key_to_send["key"], "Key sent successfully." | |
| def claim_b_key( | |
| request: Optional[gr.Request] = None, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Returns the key and increments the usage count. | |
| Returns (b_key, status_message) | |
| """ | |
| _ = request | |
| with _lock: | |
| keys, b_keys = load_state(force=True) | |
| if not b_keys or len(b_keys) == 0: | |
| return "", "No keys available." | |
| # Get the first key (you can add logic to rotate if needed) | |
| b_key = b_keys[0] | |
| # Increment the count | |
| b_key["count"] += 1 | |
| # Save changes | |
| updated_b_keys_csv = _serialize_b_keys(b_keys) | |
| _upload_csv( | |
| DATASET_REPO_ID, | |
| B_KEYS_PATH, | |
| updated_b_keys_csv, | |
| commit_message=f"Key requested at {_utc_now_iso()}", | |
| ) | |
| # refresh cache immediately | |
| _cache["ts"] = 0.0 | |
| return b_key["key"], f"Key provided." | |
| # ------------------------- | |
| # UI | |
| # ------------------------- | |
| with gr.Blocks(title="API") as demo: | |
| gr.Markdown("## API Service") | |
| with gr.Tab("Service A"): | |
| btn_a = gr.Button("Request", variant="secondary", size="sm") | |
| out_a = gr.Textbox(label="Response", interactive=False, show_label=False) | |
| status_a = gr.Textbox(label="", interactive=False, show_label=False) | |
| btn_a.click( | |
| fn=claim_key, | |
| inputs=[], | |
| outputs=[out_a, status_a], | |
| api_name="claim_key", | |
| ) | |
| with gr.Tab("Service B"): | |
| btn_b = gr.Button("Request", variant="secondary", size="sm") | |
| out_b = gr.Textbox(label="Response", interactive=False, show_label=False) | |
| status_b = gr.Textbox(label="", interactive=False, show_label=False) | |
| btn_b.click( | |
| fn=claim_b_key, | |
| inputs=[], | |
| outputs=[out_b, status_b], | |
| api_name="claim_b_key", | |
| ) | |
| demo.queue() | |
| demo.launch() | |