Spaces:
Running
Running
| """ | |
| Database Management System for Knowledge Distillation Platform | |
| نظام إدارة قواعد البيانات لمنصة تقطير المعرفة | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| import asyncio | |
| from datasets import load_dataset, Dataset | |
| from huggingface_hub import list_datasets | |
| logger = logging.getLogger(__name__) | |
| class DatabaseManager: | |
| """ | |
| Comprehensive database management system for the platform | |
| نظام إدارة قواعد البيانات الشامل للمنصة | |
| """ | |
| def __init__(self, storage_path: str = "data/databases"): | |
| self.storage_path = Path(storage_path) | |
| self.storage_path.mkdir(parents=True, exist_ok=True) | |
| self.config_file = self.storage_path / "databases_config.json" | |
| self.selected_databases_file = self.storage_path / "selected_databases.json" | |
| # Load existing configuration | |
| self.databases_config = self._load_config() | |
| self.selected_databases = self._load_selected_databases() | |
| logger.info(f"Database Manager initialized with {len(self.databases_config)} configured databases") | |
| def _load_config(self) -> Dict[str, Any]: | |
| """Load databases configuration""" | |
| try: | |
| if self.config_file.exists(): | |
| with open(self.config_file, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| else: | |
| # Initialize with default medical datasets | |
| default_config = self._get_default_medical_datasets() | |
| self._save_config(default_config) | |
| return default_config | |
| except Exception as e: | |
| logger.error(f"Error loading databases config: {e}") | |
| return {} | |
| def _save_config(self, config: Dict[str, Any]): | |
| """Save databases configuration""" | |
| try: | |
| with open(self.config_file, 'w', encoding='utf-8') as f: | |
| json.dump(config, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Error saving databases config: {e}") | |
| def _load_selected_databases(self) -> List[str]: | |
| """Load selected databases list""" | |
| try: | |
| if self.selected_databases_file.exists(): | |
| with open(self.selected_databases_file, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| else: | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error loading selected databases: {e}") | |
| return [] | |
| def _save_selected_databases(self): | |
| """Save selected databases list""" | |
| try: | |
| with open(self.selected_databases_file, 'w', encoding='utf-8') as f: | |
| json.dump(self.selected_databases, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Error saving selected databases: {e}") | |
| def _get_default_medical_datasets(self) -> Dict[str, Any]: | |
| """Get default medical datasets configuration""" | |
| return { | |
| "medical_meadow_medical_flashcards": { | |
| "name": "Medical Meadow Medical Flashcards", | |
| "name_ar": "بطاقات تعليمية طبية", | |
| "dataset_id": "medalpaca/medical_meadow_medical_flashcards", | |
| "category": "medical", | |
| "description": "Medical flashcards for educational purposes", | |
| "description_ar": "بطاقات تعليمية طبية لأغراض التعليم", | |
| "size": "~50MB", | |
| "language": "English", | |
| "modality": "text", | |
| "license": "Apache 2.0", | |
| "added_date": datetime.now().isoformat(), | |
| "status": "available" | |
| }, | |
| "pubmed_qa": { | |
| "name": "PubMed QA", | |
| "name_ar": "أسئلة وأجوبة PubMed", | |
| "dataset_id": "pubmed_qa", | |
| "category": "medical", | |
| "description": "Question answering dataset based on PubMed abstracts", | |
| "description_ar": "مجموعة بيانات أسئلة وأجوبة مبنية على ملخصات PubMed", | |
| "size": "~100MB", | |
| "language": "English", | |
| "modality": "text", | |
| "license": "MIT", | |
| "added_date": datetime.now().isoformat(), | |
| "status": "available" | |
| }, | |
| "medical_dialog": { | |
| "name": "Medical Dialog", | |
| "name_ar": "حوارات طبية", | |
| "dataset_id": "medical_dialog", | |
| "category": "medical", | |
| "description": "Medical conversation dataset", | |
| "description_ar": "مجموعة بيانات المحادثات الطبية", | |
| "size": "~200MB", | |
| "language": "English/Chinese", | |
| "modality": "text", | |
| "license": "CC BY 4.0", | |
| "added_date": datetime.now().isoformat(), | |
| "status": "available" | |
| } | |
| } | |
| async def search_huggingface_datasets(self, query: str, limit: int = 20) -> List[Dict[str, Any]]: | |
| """Search for datasets on Hugging Face""" | |
| try: | |
| logger.info(f"Searching Hugging Face for datasets: {query}") | |
| # Search datasets | |
| datasets = list_datasets(search=query, limit=limit) | |
| results = [] | |
| for dataset in datasets: | |
| try: | |
| dataset_info = { | |
| "id": dataset.id, | |
| "name": dataset.id.split('/')[-1], | |
| "author": dataset.author if hasattr(dataset, 'author') else dataset.id.split('/')[0], | |
| "description": getattr(dataset, 'description', 'No description available'), | |
| "tags": getattr(dataset, 'tags', []), | |
| "downloads": getattr(dataset, 'downloads', 0), | |
| "likes": getattr(dataset, 'likes', 0), | |
| "created_at": getattr(dataset, 'created_at', None), | |
| "last_modified": getattr(dataset, 'last_modified', None) | |
| } | |
| results.append(dataset_info) | |
| except Exception as e: | |
| logger.warning(f"Error processing dataset {dataset.id}: {e}") | |
| continue | |
| logger.info(f"Found {len(results)} datasets") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error searching Hugging Face datasets: {e}") | |
| return [] | |
| async def add_database(self, database_info: Dict[str, Any]) -> bool: | |
| """Add a new database to the configuration""" | |
| try: | |
| database_id = database_info.get('dataset_id') or database_info.get('id') | |
| if not database_id: | |
| raise ValueError("Database ID is required") | |
| # Validate dataset exists and is accessible | |
| validation_result = await self.validate_dataset(database_id) | |
| if not validation_result['valid']: | |
| raise ValueError(f"Dataset validation failed: {validation_result['error']}") | |
| # Prepare database configuration | |
| config = { | |
| "name": database_info.get('name', database_id.split('/')[-1]), | |
| "name_ar": database_info.get('name_ar', ''), | |
| "dataset_id": database_id, | |
| "category": database_info.get('category', 'general'), | |
| "description": database_info.get('description', ''), | |
| "description_ar": database_info.get('description_ar', ''), | |
| "size": database_info.get('size', 'Unknown'), | |
| "language": database_info.get('language', 'Unknown'), | |
| "modality": database_info.get('modality', 'text'), | |
| "license": database_info.get('license', 'Unknown'), | |
| "added_date": datetime.now().isoformat(), | |
| "status": "available", | |
| "validation": validation_result | |
| } | |
| # Add to configuration | |
| self.databases_config[database_id] = config | |
| self._save_config(self.databases_config) | |
| logger.info(f"Added database: {database_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error adding database: {e}") | |
| return False | |
| async def validate_dataset(self, dataset_id: str) -> Dict[str, Any]: | |
| """Validate that a dataset exists and is accessible""" | |
| try: | |
| logger.info(f"Validating dataset: {dataset_id}") | |
| # Try to load dataset info | |
| dataset = load_dataset(dataset_id, split="train", streaming=True) | |
| # Get basic info | |
| sample = next(iter(dataset)) | |
| features = list(sample.keys()) if sample else [] | |
| return { | |
| "valid": True, | |
| "features": features, | |
| "sample_keys": features, | |
| "accessible": True, | |
| "error": None | |
| } | |
| except Exception as e: | |
| logger.warning(f"Dataset validation failed for {dataset_id}: {e}") | |
| return { | |
| "valid": False, | |
| "features": [], | |
| "sample_keys": [], | |
| "accessible": False, | |
| "error": str(e) | |
| } | |
| def get_all_databases(self) -> Dict[str, Any]: | |
| """Get all configured databases""" | |
| return self.databases_config | |
| def get_selected_databases(self) -> List[str]: | |
| """Get list of selected database IDs""" | |
| return self.selected_databases | |
| def select_database(self, database_id: str) -> bool: | |
| """Select a database for use""" | |
| try: | |
| if database_id not in self.databases_config: | |
| raise ValueError(f"Database {database_id} not found in configuration") | |
| if database_id not in self.selected_databases: | |
| self.selected_databases.append(database_id) | |
| self._save_selected_databases() | |
| logger.info(f"Selected database: {database_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error selecting database: {e}") | |
| return False | |
| def deselect_database(self, database_id: str) -> bool: | |
| """Deselect a database""" | |
| try: | |
| if database_id in self.selected_databases: | |
| self.selected_databases.remove(database_id) | |
| self._save_selected_databases() | |
| logger.info(f"Deselected database: {database_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deselecting database: {e}") | |
| return False | |
| def remove_database(self, database_id: str) -> bool: | |
| """Remove a database from configuration""" | |
| try: | |
| if database_id in self.databases_config: | |
| del self.databases_config[database_id] | |
| self._save_config(self.databases_config) | |
| if database_id in self.selected_databases: | |
| self.selected_databases.remove(database_id) | |
| self._save_selected_databases() | |
| logger.info(f"Removed database: {database_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error removing database: {e}") | |
| return False | |
| def get_database_info(self, database_id: str) -> Optional[Dict[str, Any]]: | |
| """Get detailed information about a specific database""" | |
| return self.databases_config.get(database_id) | |
| def get_databases_by_category(self, category: str) -> Dict[str, Any]: | |
| """Get databases filtered by category""" | |
| return { | |
| db_id: db_info | |
| for db_id, db_info in self.databases_config.items() | |
| if db_info.get('category') == category | |
| } | |
| async def load_selected_datasets(self, max_samples: int = 1000) -> Dict[str, Any]: | |
| """Load data from selected datasets""" | |
| loaded_datasets = {} | |
| for database_id in self.selected_databases: | |
| try: | |
| logger.info(f"Loading dataset: {database_id}") | |
| dataset = load_dataset(database_id, split="train", streaming=True) | |
| samples = list(dataset.take(max_samples)) | |
| loaded_datasets[database_id] = { | |
| "samples": samples, | |
| "count": len(samples), | |
| "info": self.databases_config.get(database_id, {}) | |
| } | |
| logger.info(f"Loaded {len(samples)} samples from {database_id}") | |
| except Exception as e: | |
| logger.error(f"Error loading dataset {database_id}: {e}") | |
| loaded_datasets[database_id] = { | |
| "samples": [], | |
| "count": 0, | |
| "error": str(e), | |
| "info": self.databases_config.get(database_id, {}) | |
| } | |
| return loaded_datasets | |