gradium_setup / app.py
andito's picture
andito HF Staff
Update app.py
64585e4 verified
import os
import io
import csv
import time
import threading
from datetime import datetime, timezone
from typing import Dict, Optional, Tuple
import gradio as gr
from huggingface_hub import HfApi
# -------------------------
# Config
# -------------------------
DATASET_REPO_ID = os.environ.get("DATASET_REPO_ID") # Space secret
HF_TOKEN = os.environ["HF_TOKEN"] # Space secret
KEYS_PATH = "keys.csv"
B_KEYS_PATH = "b_keys.csv"
api = HfApi(token=HF_TOKEN)
_lock = threading.Lock()
# Simple cache to reduce Hub reads
_cache = {
"ts": 0.0,
"keys": None, # type: ignore
"b_keys": None, # type: ignore
}
CACHE_TTL_SEC = 10.0
# -------------------------
# Helpers
# -------------------------
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def _download_csv(repo_id: str, path_in_repo: str) -> str:
# Download file bytes from the dataset repo, return text
file_bytes = api.hf_hub_download(
repo_id=repo_id,
filename=path_in_repo,
repo_type="dataset",
token=HF_TOKEN,
)
with open(file_bytes, "r", encoding="utf-8") as f:
return f.read()
def _upload_csv(repo_id: str, path_in_repo: str, content: str, commit_message: str) -> None:
api.upload_file(
path_or_fileobj=io.BytesIO(content.encode("utf-8")),
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset",
commit_message=commit_message,
token=HF_TOKEN,
)
def _parse_keys(csv_text: str) -> list:
keys = []
reader = csv.DictReader(io.StringIO(csv_text))
for r in reader:
if r["key"].strip():
keys.append({
"key": r["key"].strip(),
"sent": (r.get("sent") or "").strip(),
})
return keys
def _parse_b_keys(csv_text: str) -> list:
keys = []
reader = csv.DictReader(io.StringIO(csv_text))
for r in reader:
if r["key"].strip():
keys.append({
"key": r["key"].strip(),
"count": int(r.get("count") or "0"),
})
return keys
def _serialize_keys(keys: list) -> str:
out = io.StringIO()
fieldnames = ["key", "sent"]
w = csv.DictWriter(out, fieldnames=fieldnames)
w.writeheader()
for k in keys:
w.writerow({
"key": k["key"],
"sent": k.get("sent", ""),
})
return out.getvalue()
def _serialize_b_keys(keys: list) -> str:
out = io.StringIO()
fieldnames = ["key", "count"]
w = csv.DictWriter(out, fieldnames=fieldnames)
w.writeheader()
for k in keys:
w.writerow({
"key": k["key"],
"count": k.get("count", 0),
})
return out.getvalue()
def load_state(force: bool = False):
now = time.time()
if (not force) and _cache["keys"] is not None and (now - _cache["ts"] < CACHE_TTL_SEC):
return _cache["keys"], _cache["b_keys"]
keys_csv = _download_csv(DATASET_REPO_ID, KEYS_PATH)
b_keys_csv = _download_csv(DATASET_REPO_ID, B_KEYS_PATH)
keys = _parse_keys(keys_csv)
b_keys = _parse_b_keys(b_keys_csv)
_cache["ts"] = now
_cache["keys"] = keys
_cache["b_keys"] = b_keys
return keys, b_keys
# -------------------------
# Core API
# -------------------------
def claim_key(
request: Optional[gr.Request] = None,
) -> Tuple[str, str]:
"""
Returns the next available key.
Returns (key, status_message)
"""
_ = request
with _lock:
keys, _ = load_state(force=True)
# Find available keys
available_keys = [k for k in keys if not k["sent"]]
if len(available_keys) < 1:
return "", "No available keys remaining."
# Send the next available key
key_to_send = available_keys[0]
# Mark key as sent
key_to_send["sent"] = _utc_now_iso()
# Save changes
updated_keys_csv = _serialize_keys(keys)
_upload_csv(
DATASET_REPO_ID,
KEYS_PATH,
updated_keys_csv,
commit_message=f"Key sent at {_utc_now_iso()}",
)
# refresh cache immediately
_cache["ts"] = 0.0
return key_to_send["key"], "Key sent successfully."
def claim_b_key(
request: Optional[gr.Request] = None,
) -> Tuple[str, str]:
"""
Returns the key and increments the usage count.
Returns (b_key, status_message)
"""
_ = request
with _lock:
keys, b_keys = load_state(force=True)
if not b_keys or len(b_keys) == 0:
return "", "No keys available."
# Get the first key (you can add logic to rotate if needed)
b_key = b_keys[0]
# Increment the count
b_key["count"] += 1
# Save changes
updated_b_keys_csv = _serialize_b_keys(b_keys)
_upload_csv(
DATASET_REPO_ID,
B_KEYS_PATH,
updated_b_keys_csv,
commit_message=f"Key requested at {_utc_now_iso()}",
)
# refresh cache immediately
_cache["ts"] = 0.0
return b_key["key"], f"Key provided."
# -------------------------
# UI
# -------------------------
with gr.Blocks(title="API") as demo:
gr.Markdown("## API Service")
with gr.Tab("Service A"):
btn_a = gr.Button("Request", variant="secondary", size="sm")
out_a = gr.Textbox(label="Response", interactive=False, show_label=False)
status_a = gr.Textbox(label="", interactive=False, show_label=False)
btn_a.click(
fn=claim_key,
inputs=[],
outputs=[out_a, status_a],
api_name="claim_key",
)
with gr.Tab("Service B"):
btn_b = gr.Button("Request", variant="secondary", size="sm")
out_b = gr.Textbox(label="Response", interactive=False, show_label=False)
status_b = gr.Textbox(label="", interactive=False, show_label=False)
btn_b.click(
fn=claim_b_key,
inputs=[],
outputs=[out_b, status_b],
api_name="claim_b_key",
)
demo.queue()
demo.launch()