Python3 Django 使用原生SQL实现分页


python3 django 使用原生SQL语句分页-排序-查询

django使用原生sql进行分页,类似django rest framework

-- 用DESC 表示按照倒序排序 (即: 从大到小) 降序
order by xxx desc
-- 默认 ASC 表示按照升序排序 (即: 从小到大) 升序
order by xxx

新建Python Package 存放封装的文件

封装 sql_connections.py 文件 (处理sql语句)

from django.db import connections


def exec_sql(sql, params=None, db='default'):
    """
    执行sql,例如insert和update
    :param sql: sql语句
    :param params: sql语句参数
    :param db: Django数据库名
    """
    cursor = connections[db].cursor()
    cursor.execute(sql, params)
    cursor.close()
    cursor.last_insert_id()
    return True


def fetchone_sql(sql, params=None, db='default', flat=False):
    """
    返回一行数据
    :param sql: sql语句
    :param params: sql语句参数
    :param db: Django数据库名
    :param flat: 如果为True,只返回第一个字段值,例如:id
    :return: 例如:(id, 'username', 'first_name')
    """
    cursor = connections[db].cursor()
    cursor.execute(sql, params)
    fetchone = cursor.fetchone()
    cursor.close()
    if fetchone:
        fetchone = fetchone[0] if flat else fetchone
    return fetchone


def fetchone_to_dict(sql, params=None, db='default'):
    """
    返回一行数据
    :param sql: sql语句
    :param params: sql语句参数
    :param db: Django数据库名
    :return: 例如:{"id": id, "username": 'username', "first_name": 'first_name'}
    """
    cursor = connections[db].cursor()
    cursor.execute(sql, params)
    desc = cursor.description
    row = dict(zip([col[0] for col in desc], cursor.fetchone()))
    cursor.close()
    return row


def fetchall_sql(sql, params=None, db='default', flat=False):
    """
    返回全部数据
    :param sql: sql语句
    :param params: sql语句参数
    :param db: Django数据库名
    :param flat: 如果为True,只返回每行数据第一个字段值的元组,例如:(id1, id2, id3)
    :return: 例如:[(id, 'username', 'first_name')]
    """
    cursor = connections[db].cursor()
    cursor.execute(sql, params)
    fetchall = cursor.fetchall()
    cursor.close()
    if fetchall:
        fetchall = tuple([o[0] for o in fetchall]) if flat else fetchall
    return fetchall


def fetchall_to_dict(sql, params=None, db='default'):
    """
    返回全部数据
    :param sql: sql语句
    :param params: sql语句参数
    :param db: Django数据库名
    :return: 例如:[{"id": id, "username": 'username', "first_name": 'first_name'}]
    """
    cursor = connections[db].cursor()
    cursor.execute(sql, params)
    desc = cursor.description
    object_list = [
        dict(zip([col[0] for col in desc], row))
        for row in cursor.fetchall()
    ]
    cursor.close()
    return object_list


def get_s_sql(table, keys, conditions, isdistinct=0):
    '''
    生成select的sql语句
    @table,查询记录的表名
    @key,需要查询的字段
    @conditions,插入的数据,字典
    @isdistinct,查询的数据是否不重复
    '''
    if isdistinct:
        sql = 'select distinct %s ' % ",".join(keys)
    else:
        sql = 'select  %s ' % ",".join(keys)
    sql += ' from %s ' % table
    if conditions:
        sql += ' where %s ' % dict_2_str_and(conditions)
    return sql


def dict_2_str(dictin):
    '''
    将字典变成,key='value',key='value' 的形式
    '''
    tmplist = []
    for k, v in dictin.items():
        tmp = "{k} = %({v})s".format(k=str(k), v=str(v))
        tmplist.append(' ' + tmp + ' ')
    return ','.join(tmplist)


def dict_2_str_and(dictin):
    '''
    将字典变成,key='value' and key='value'的形式
    '''
    tmplist = []
    for k, v in dictin.items():
        tmp = "{k} = %({v})s".format(k=str(k), v=str(v))
        tmplist.append(' ' + tmp + ' ')
    return ' and '.join(tmplist)

封装 sql_paginator.py 文件 (分页)

from urllib import parse

from django.core.paginator import Paginator
from django.utils.encoding import force_str

from .sql_connections import fetchone_sql, fetchall_to_dict

page_query_param = "page"


def replace_query_param(url, key, val):
    """
    Given a URL and a key/val pair, set or replace an item in the query
    parameters of the URL, and return the new URL.
    """
    (scheme, netloc, path, query, fragment) = parse.urlsplit(force_str(url))
    query_dict = parse.parse_qs(query, keep_blank_values=True)
    query_dict[force_str(key)] = [force_str(val)]
    query = parse.urlencode(sorted(list(query_dict.items())), doseq=True)
    return parse.urlunsplit((scheme, netloc, path, query, fragment))


def remove_query_param(url, key):
    """
    Given a URL and a key/val pair, remove an item in the query
    parameters of the URL, and return the new URL.
    """
    (scheme, netloc, path, query, fragment) = parse.urlsplit(force_str(url))
    query_dict = parse.parse_qs(query, keep_blank_values=True)
    query_dict.pop(key, None)
    query = parse.urlencode(sorted(list(query_dict.items())), doseq=True)
    return parse.urlunsplit((scheme, netloc, path, query, fragment))


def get_next_link(request, page):
    if not page.has_next():
        return None
    url = request.build_absolute_uri()
    page_number = page.next_page_number()
    return replace_query_param(url, page_query_param, page_number)


def get_previous_link(request, page):
    if not page.has_previous():
        return None
    url = request.build_absolute_uri()
    page_number = page.previous_page_number()
    if page_number == 1:
        return remove_query_param(url, page_query_param)
    return replace_query_param(url, page_query_param, page_number)


def paginator(request, data_list, page_size, page):
    """
    封装Django分页
    :param data_list: sql语句
    :param page_size: 每页显示多少条数据
    :param page: 当前第几页
    :return
    """
    page = int(page)
    page_size = int(page_size)
    pages = Paginator(data_list, page_size)
    """封装Django分页"""
    # 防止超出页数
    if not page > 0:
        page = 1
    if page > pages.num_pages:
        page = pages.num_pages
    p = pages.page(page)  # 获取本页数据

    return dict(
        [
            ("page", page),
            ("total_page", pages.num_pages),
            ("count", pages.count),
            ("next", get_next_link(request, p)),
            ("previous", get_previous_link(request, p)),
            ("items", p.object_list),
        ]
    )


class QueryWrapper(object):
    """查询集包装器。实现django Paginator需要的必要方法,实现和query一样使用Paginator分页"""

    def __init__(self, sql, count_word=None, params=None, db="default"):
        """
        :param sql: sql语句
        :param count_word: 计算总数的字段,select count(count_word) from table,命名方式:_count.字段名
        :param params: sql语句的params参数
        :param db: 数据库名称(Django配置)
        """
        self.db = db
        self.sql = sql
        self.count_word = count_word
        self.params = params

    def count(self):
        """计算总页数"""
        if self.count_word:
            sql = """select count(%s) from (%s) _count""" % (self.count_word, self.sql)
        else:
            sql = """select count(*) from (%s) _count""" % (self.sql)
        return fetchone_sql(sql, self.params, db=self.db, flat=True)  # 返回总页数

    def __getitem__(self, index):
        """ index: slice(0, 3, None)"""
        sql = self.sql + ' LIMIT {start}, {num}'.format(start=index.start, num=index.stop - index.start)
        return fetchall_to_dict(sql, self.params, db=self.db)  # 字典列表形式返回

操作使用

views.py里面调用

from utils.sql_paginator_init.sql_paginator import QueryWrapper, paginator # 导包

class KeyWordsSummaryView(ListAPIView):
    authentication_classes = [MyBaseAuthentication, ] # 认证

    def list(self, request, *args, **kwargs):
        category = request.GET.get('category') # 分类id
        filter_keyword = request.GET.get('k') # 搜索词
        filter_sort_keyword = request.GET.get('sort', 'search_volume') # 排序
        filter_asc_desc = request.GET.get('asc_desc', 'desc') # 正序 降序
        page = request.GET.get('page', 1) # 分页页码
        page_size = 100 # 默认值 每页100条
        if not filter_keyword or filter_keyword is None:
            # 如果排序 不带搜索词
            sql = """
                select 
                id,
                xxx1 as a1,
                xxx2 as a2,
                xxx3 as a3,
                xxx4 as a4,
                xxx5 as a5
                 from db_table 
                 where category=%(cid)s AND is_delete=FALSE order by %(sort)s %(asc_desc)s
            """
            k = None
        else:
            # 带搜索词的 分页
            k = "%" + str(filter_keyword) + "%"
            sql = """
                    select 
                id,
                xxx1 as 参数名1,
                xxx2 as 参数名2,
                xxx3 as 参数名3,
                xxx4 as 参数名4,
                xxx5 as 参数名5
                 from db_table 
                 where category=%(cid)s and xxx1 like %(k)s AND is_delete=FALSE order by %(sort)s %(asc_desc)s
                """
        params = {'cid': category, "k": k, 'sort': filter_sort_keyword, 'asc_desc': filter_asc_desc}
        count_word = "_count.id"
        query = QueryWrapper(sql, count_word, params) # 注意该方法里面的参数顺序 以及作用
        data = paginator(request, query, page_size, page) # 封装好之后的数据

        return Response({"code": RET.OK, "data": data})

返回值参考 假如数据够 从第2页开始

{
    "code": "0",                                               # 状态码
    "data": {
        "page": 2,                                             # 当前页码
        "total_page": 10,                                      # 总页数
        "count": 1000,                                         # 总数据量
        "next": "http://127.0.0.1/url/?category=151&page=3",   # 下一页
        "previous": "http://127.0.0.1/url/?category=151",      # 上一页
        "items": [                                             # 数据列表[{},{}]
            {
                "id": 9106328,
                "a1": "wifi 5ghz extender",
                "a2": "[]",
                "a3": 2,
                "a4": 22,
                "a5": 0
            }
            ...
        ]
    }
}


文章作者: 柒仔
文章链接: /article/44/
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 XiaoLiu!
侵权声明: 若无意对您的文章造成侵权,请您留言,博主看到后会及时处理,谢谢。
评论-----昵称和邮箱必填,网址选填
  目录