public_sentiment/web/dao/base_dao.py
2024-09-18 13:38:24 +08:00

158 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
from django.db.models.query import QuerySet
from django.db import models
from web.manager.snowflake_manager import SnowflakeManager
class BaseDao:
"""
dao基类
"""
# 子类必须覆盖这个
model_class = models.Model
save_batch_size = 1000
snowflake_manager = SnowflakeManager()
def save(self, obj):
"""
添加
"""
if not obj:
return False
obj.id = self.snowflake_manager.next_id()
obj.create_time = datetime.now()
obj.save()
return True
def save_batch(self, objs, *, batch_size=save_batch_size):
"""
批量添加
"""
if not objs:
return False
for obj in objs:
obj.id = snowflake.next_id()
self.model_class.objects.bulk_create(objs, batch_size=batch_size)
return True
def delete(self, obj):
"""
删除
"""
if not obj:
return False
obj.delete()
return True
def delete_batch(self, objs):
"""
批量删除
"""
if not objs:
return False
for obj in objs:
self.delete(obj)
return True
def delete_batch_by_query(self, filter_kw: dict, exclude_kw: dict):
"""
根据条件,批量删除
"""
self.model_class.objects.filter(**filter_kw).exclude(**exclude_kw).delete()
return True
def delete_by_fake(self, obj):
"""
假删除/伪删除
"""
if obj is None:
return False
obj.is_deleted = True
obj.save()
return True
def update(self, obj):
"""
更新
"""
if not obj:
return False
obj.save()
return True
def update_batch(self, objs):
"""
批量更新
"""
if not objs:
return False
for obj in objs:
self.update(obj)
return True
def update_batch_by_query(self, query_kwargs: dict, exclude_kw: dict, newattrs_kwargs: dict):
"""
根据条件,批量更新
"""
self.model_class.objects.filter(**query_kwargs).exclude(**exclude_kw).update(**newattrs_kwargs)
def find_one(self, filter_kw: dict, exclude_kw: dict, order_bys: list):
"""
根据条件,返回一条记录
"""
qs = self.model_class.objects.filter(**filter_kw).exclude(**exclude_kw)
if order_bys:
qs = qs.order_by(*order_bys)
return qs.first()
def find_queryset(self, filter_kw: dict, exclude_kw: dict, order_bys: list) -> QuerySet:
"""
根据条件返回QuerySet
"""
if order_bys != None and len(order_bys) != 0:
query_set = self.model_class.objects.filter(**filter_kw).exclude(**exclude_kw)
for by in order_bys:
query_set = query_set.order_by(by)
return query_set
else:
return self.model_class.objects.filter(**filter_kw).exclude(**exclude_kw)
def find_list(self, filter_kw: dict, exclude_kw: dict, order_bys: list) -> list:
"""
根据条件,返回对象列表
"""
queryset = self.find_queryset(filter_kw, exclude_kw, order_bys)
model_instances = [model for model in queryset]
return model_instances
def is_exists(self, filter_kw: dict, exclude_kw: dict) -> bool:
"""
根据条件,判断记录是否存在
"""
return self.model_class.objects.filter(**filter_kw).exclude(**exclude_kw).exists()
def get_count(self, filter_kw: dict, exclude_kw: dict) -> int:
"""
根据条件,计数
"""
return self.model_class.objects.filter(**filter_kw).exclude(**exclude_kw).count()