File size: 1,555 Bytes
61d9463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from typing import List, Dict, Optional
from tinydb import Query
from backend.app.database.base import BaseDB
from backend.app import schemas

class UserDB(BaseDB):
    def __init__(self):
        super().__init__()
        self.table = self.db.table('users')
        self.query = Query()

    def get_users(self, user_ids: List[str]) -> List[Dict]:
        return self.table.search(self.query.id.one_of(user_ids))

    def get_user(self, user_id: str) -> Optional[Dict]:
        return self.table.get(self.query.id == user_id)

    def create_users(self, users: List[schemas.UserBase]) -> List[Dict]:
        existing_ids = {user['id'] for user in self.get_users([user.id for user in users])}
        new_users = [user.dict() for user in users if user.id not in existing_ids]
        
        if new_users:
            inserted_ids = self.table.insert_multiple(new_users)
            return [self.table.get(doc_id=id) for id in inserted_ids]
        return []

    def update_users(self, users: List[schemas.UserBase]) -> List[Dict]:
        updated = []
        for user in users:
            if self.table.update(user.dict(), self.query.id == user.id):
                updated.append(self.table.get(self.query.id == user.id))
        return updated

    def delete_users(self, user_ids: List[str]) -> List[str]:
        existing_users = self.get_users(user_ids)
        existing_ids = [user['id'] for user in existing_users]
        
        if existing_ids:
            self.table.remove(self.query.id.one_of(existing_ids))
        return existing_ids