Spaces:
Running
Running
| """ | |
| Advanced Token Manager for Hugging Face authentication | |
| Supports persistent storage with encryption and multiple token types | |
| """ | |
| import os | |
| import sqlite3 | |
| import logging | |
| import json | |
| from typing import Dict, Any, List, Optional | |
| from pathlib import Path | |
| from cryptography.fernet import Fernet | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| import base64 | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class TokenManager: | |
| """ | |
| Advanced token manager with encryption and persistent storage | |
| """ | |
| def __init__(self, db_path: str = "database/tokens.db"): | |
| """ | |
| Initialize token manager | |
| Args: | |
| db_path: Path to SQLite database file | |
| """ | |
| self.db_path = Path(db_path) | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Initialize encryption | |
| self.encryption_key = self._get_or_create_encryption_key() | |
| self.cipher = Fernet(self.encryption_key) | |
| # Initialize database | |
| self._init_database() | |
| # Load tokens from environment variables | |
| self._load_env_tokens() | |
| # Token type definitions | |
| self.token_types = { | |
| 'read': { | |
| 'name': 'Read Token', | |
| 'description': 'رمز للقراءة فقط من المستودعات', | |
| 'permissions': ['read_public_repos', 'read_private_repos_with_access', | |
| 'download_models', 'download_datasets'], | |
| 'restrictions': ['cannot_upload', 'cannot_create_repos', 'cannot_modify_content'], | |
| 'use_cases': ['تحميل النماذج للتدريب', 'الوصول للبيانات الخاصة', 'التطوير والاختبار'], | |
| 'security_level': 'medium', | |
| 'recommended_for': 'development' | |
| }, | |
| 'write': { | |
| 'name': 'Write Token', | |
| 'description': 'رمز للقراءة والكتابة الكاملة', | |
| 'permissions': ['all_read_permissions', 'upload_files', 'create_repositories', | |
| 'modify_content', 'manage_repo_settings', 'delete_files'], | |
| 'restrictions': ['limited_by_account_permissions'], | |
| 'use_cases': ['رفع النماذج المدربة', 'مشاركة النتائج مع المجتمع', 'إدارة المشاريع الشخصية'], | |
| 'security_level': 'high', | |
| 'recommended_for': 'production' | |
| }, | |
| 'fine_grained': { | |
| 'name': 'Fine-grained Token', | |
| 'description': 'رمز بأذونات مخصصة ومحددة', | |
| 'permissions': ['custom_per_repository', 'granular_access_control', | |
| 'time_limited_access', 'ip_restricted_access'], | |
| 'restrictions': ['repository_specific', 'time_limited', 'ip_restricted'], | |
| 'use_cases': ['المشاريع التجارية', 'البيانات الحساسة', 'فرق العمل الكبيرة'], | |
| 'security_level': 'very_high', | |
| 'recommended_for': 'enterprise' | |
| } | |
| } | |
| logger.info("Token Manager initialized") | |
| def _load_env_tokens(self): | |
| """Load tokens from environment variables""" | |
| env_tokens = { | |
| 'read_token': { | |
| 'token': os.getenv('HF_TOKEN_READ'), | |
| 'type': 'read', | |
| 'description': 'رمز القراءة من متغيرات البيئة - للتطوير والتعلم' | |
| }, | |
| 'write_token': { | |
| 'token': os.getenv('HF_TOKEN_WRITE'), | |
| 'type': 'write', | |
| 'description': 'رمز الكتابة من متغيرات البيئة - لمشاركة النماذج' | |
| }, | |
| 'fine_grained_token': { | |
| 'token': os.getenv('HF_TOKEN_FINE_GRAINED'), | |
| 'type': 'fine_grained', | |
| 'description': 'رمز مخصص من متغيرات البيئة - للمشاريع التجارية' | |
| } | |
| } | |
| # Save tokens from environment if they exist | |
| for name, token_info in env_tokens.items(): | |
| if token_info['token']: | |
| # Check if token already exists | |
| existing_token = self.get_token(name) | |
| if not existing_token: | |
| success = self.save_token( | |
| name=name, | |
| token=token_info['token'], | |
| token_type=token_info['type'], | |
| description=token_info['description'], | |
| is_default=(token_info['type'] == 'read') # Set read as default | |
| ) | |
| if success: | |
| logger.info(f"Loaded {token_info['type']} token from environment") | |
| def get_token_for_task(self, task_type: str = 'read') -> Optional[str]: | |
| """ | |
| Get appropriate token for specific task | |
| Args: | |
| task_type: Type of task (read, write, medical, private, upload, download) | |
| Returns: | |
| Appropriate token for the task | |
| """ | |
| # Map task types to token preferences | |
| task_token_map = { | |
| 'read': ['read_token', 'fine_grained_token', 'write_token'], | |
| 'download': ['read_token', 'fine_grained_token', 'write_token'], | |
| 'write': ['write_token', 'fine_grained_token'], | |
| 'upload': ['write_token', 'fine_grained_token'], | |
| 'medical': ['fine_grained_token', 'write_token', 'read_token'], | |
| 'private': ['fine_grained_token', 'write_token'], | |
| 'commercial': ['fine_grained_token'], | |
| 'enterprise': ['fine_grained_token'] | |
| } | |
| # Get preferred token order for task | |
| preferred_tokens = task_token_map.get(task_type, ['read_token']) | |
| # Try to get tokens in order of preference | |
| for token_name in preferred_tokens: | |
| token = self.get_token(token_name) | |
| if token: | |
| logger.debug(f"Using {token_name} for task: {task_type}") | |
| return token | |
| # Fallback to default token | |
| default_token = self.get_token() | |
| if default_token: | |
| logger.debug(f"Using default token for task: {task_type}") | |
| return default_token | |
| # Last resort: try environment variables directly | |
| env_fallbacks = { | |
| 'read': 'HF_TOKEN_READ', | |
| 'write': 'HF_TOKEN_WRITE', | |
| 'medical': 'HF_TOKEN_FINE_GRAINED', | |
| 'private': 'HF_TOKEN_FINE_GRAINED' | |
| } | |
| env_var = env_fallbacks.get(task_type, 'HF_TOKEN') | |
| env_token = os.getenv(env_var) | |
| if env_token: | |
| logger.debug(f"Using environment token {env_var} for task: {task_type}") | |
| return env_token | |
| logger.warning(f"No suitable token found for task: {task_type}") | |
| return None | |
| def _get_or_create_encryption_key(self) -> bytes: | |
| """Get or create encryption key for token storage""" | |
| key_file = self.db_path.parent / ".token_key" | |
| if key_file.exists(): | |
| with open(key_file, 'rb') as f: | |
| return f.read() | |
| else: | |
| # Generate new key | |
| password = os.urandom(32) # Random password | |
| salt = os.urandom(16) | |
| kdf = PBKDF2HMAC( | |
| algorithm=hashes.SHA256(), | |
| length=32, | |
| salt=salt, | |
| iterations=100000, | |
| ) | |
| key = base64.urlsafe_b64encode(kdf.derive(password)) | |
| # Save key securely | |
| with open(key_file, 'wb') as f: | |
| f.write(key) | |
| # Set restrictive permissions | |
| os.chmod(key_file, 0o600) | |
| logger.info("Created new encryption key") | |
| return key | |
| def _init_database(self): | |
| """Initialize SQLite database""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS tokens ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT UNIQUE NOT NULL, | |
| token_type TEXT NOT NULL, | |
| encrypted_token TEXT NOT NULL, | |
| is_default BOOLEAN DEFAULT FALSE, | |
| description TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| last_used TIMESTAMP, | |
| usage_count INTEGER DEFAULT 0, | |
| is_active BOOLEAN DEFAULT TRUE | |
| ) | |
| ''') | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS token_usage_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| token_name TEXT NOT NULL, | |
| operation TEXT NOT NULL, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| success BOOLEAN, | |
| error_message TEXT | |
| ) | |
| ''') | |
| conn.commit() | |
| logger.info("Database initialized") | |
| def save_token(self, name: str, token: str, token_type: str = 'read', | |
| description: str = '', is_default: bool = False) -> bool: | |
| """ | |
| Save encrypted token to database | |
| Args: | |
| name: Token name/identifier | |
| token: HF token string | |
| token_type: Type of token (read/write/fine_grained) | |
| description: Optional description | |
| is_default: Whether this should be the default token | |
| Returns: | |
| Success status | |
| """ | |
| try: | |
| # Validate token type | |
| if token_type not in self.token_types: | |
| raise ValueError(f"Invalid token type: {token_type}") | |
| # Encrypt token | |
| encrypted_token = self.cipher.encrypt(token.encode()).decode() | |
| with sqlite3.connect(self.db_path) as conn: | |
| # If setting as default, unset other defaults | |
| if is_default: | |
| conn.execute('UPDATE tokens SET is_default = FALSE') | |
| # Insert or update token | |
| conn.execute(''' | |
| INSERT OR REPLACE INTO tokens | |
| (name, token_type, encrypted_token, is_default, description, created_at) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', (name, token_type, encrypted_token, is_default, description, datetime.now())) | |
| conn.commit() | |
| logger.info(f"Saved token '{name}' of type '{token_type}'") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save token '{name}': {e}") | |
| return False | |
| def get_token(self, name: Optional[str] = None) -> Optional[str]: | |
| """ | |
| Get decrypted token by name or default token | |
| Args: | |
| name: Token name (if None, returns default token) | |
| Returns: | |
| Decrypted token string or None | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| if name: | |
| cursor = conn.execute( | |
| 'SELECT encrypted_token FROM tokens WHERE name = ? AND is_active = TRUE', | |
| (name,) | |
| ) | |
| else: | |
| cursor = conn.execute( | |
| 'SELECT encrypted_token, name FROM tokens WHERE is_default = TRUE AND is_active = TRUE' | |
| ) | |
| result = cursor.fetchone() | |
| if result: | |
| encrypted_token = result[0] | |
| token_name = result[1] if not name else name | |
| # Decrypt token | |
| decrypted_token = self.cipher.decrypt(encrypted_token.encode()).decode() | |
| # Update usage statistics | |
| self._update_token_usage(token_name) | |
| return decrypted_token | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get token '{name}': {e}") | |
| return None | |
| def list_tokens(self) -> List[Dict[str, Any]]: | |
| """ | |
| List all saved tokens (without decrypting them) | |
| Returns: | |
| List of token information | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.execute(''' | |
| SELECT name, token_type, is_default, description, created_at, | |
| last_used, usage_count, is_active | |
| FROM tokens | |
| ORDER BY is_default DESC, created_at DESC | |
| ''') | |
| tokens = [] | |
| for row in cursor.fetchall(): | |
| token_info = { | |
| 'name': row[0], | |
| 'type': row[1], | |
| 'type_info': self.token_types.get(row[1], {}), | |
| 'is_default': bool(row[2]), | |
| 'description': row[3], | |
| 'created_at': row[4], | |
| 'last_used': row[5], | |
| 'usage_count': row[6], | |
| 'is_active': bool(row[7]) | |
| } | |
| tokens.append(token_info) | |
| return tokens | |
| except Exception as e: | |
| logger.error(f"Failed to list tokens: {e}") | |
| return [] | |
| def delete_token(self, name: str) -> bool: | |
| """ | |
| Delete token from database | |
| Args: | |
| name: Token name to delete | |
| Returns: | |
| Success status | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.execute('DELETE FROM tokens WHERE name = ?', (name,)) | |
| if cursor.rowcount > 0: | |
| conn.commit() | |
| logger.info(f"Deleted token '{name}'") | |
| return True | |
| else: | |
| logger.warning(f"Token '{name}' not found") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Failed to delete token '{name}': {e}") | |
| return False | |
| def set_default_token(self, name: str) -> bool: | |
| """ | |
| Set a token as the default | |
| Args: | |
| name: Token name to set as default | |
| Returns: | |
| Success status | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| # Check if token exists | |
| cursor = conn.execute('SELECT id FROM tokens WHERE name = ?', (name,)) | |
| if not cursor.fetchone(): | |
| logger.error(f"Token '{name}' not found") | |
| return False | |
| # Unset all defaults | |
| conn.execute('UPDATE tokens SET is_default = FALSE') | |
| # Set new default | |
| conn.execute('UPDATE tokens SET is_default = TRUE WHERE name = ?', (name,)) | |
| conn.commit() | |
| logger.info(f"Set '{name}' as default token") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to set default token '{name}': {e}") | |
| return False | |
| def validate_token(self, token: str) -> Dict[str, Any]: | |
| """ | |
| Validate HF token by testing API access | |
| Args: | |
| token: Token to validate | |
| Returns: | |
| Validation result | |
| """ | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=token) | |
| user_info = api.whoami() | |
| return { | |
| 'valid': True, | |
| 'username': user_info.get('name', 'unknown'), | |
| 'email': user_info.get('email', ''), | |
| 'plan': user_info.get('plan', 'free'), | |
| 'message': 'Token is valid and working' | |
| } | |
| except Exception as e: | |
| return { | |
| 'valid': False, | |
| 'error': str(e), | |
| 'message': 'Token validation failed' | |
| } | |
| def _update_token_usage(self, token_name: str): | |
| """Update token usage statistics""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(''' | |
| UPDATE tokens | |
| SET last_used = ?, usage_count = usage_count + 1 | |
| WHERE name = ? | |
| ''', (datetime.now(), token_name)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to update token usage: {e}") | |
| def log_token_usage(self, token_name: str, operation: str, | |
| success: bool, error_message: str = ''): | |
| """Log token usage for auditing""" | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute(''' | |
| INSERT INTO token_usage_log | |
| (token_name, operation, success, error_message) | |
| VALUES (?, ?, ?, ?) | |
| ''', (token_name, operation, success, error_message)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to log token usage: {e}") | |
| def get_token_recommendations(self, intended_use: str) -> Dict[str, Any]: | |
| """ | |
| Get token type recommendations based on intended use | |
| Args: | |
| intended_use: Description of intended use | |
| Returns: | |
| Recommendation information | |
| """ | |
| use_lower = intended_use.lower() | |
| if any(word in use_lower for word in ['learn', 'study', 'test', 'develop']): | |
| recommended_type = 'read' | |
| elif any(word in use_lower for word in ['share', 'upload', 'publish', 'create']): | |
| recommended_type = 'write' | |
| elif any(word in use_lower for word in ['commercial', 'enterprise', 'team', 'sensitive']): | |
| recommended_type = 'fine_grained' | |
| else: | |
| recommended_type = 'read' # Default to read | |
| return { | |
| 'recommended_type': recommended_type, | |
| 'type_info': self.token_types[recommended_type], | |
| 'explanation': f"Based on your intended use ('{intended_use}'), we recommend a {recommended_type} token." | |
| } | |