热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用django+rpc进行服务内部交互

一、为什么使用rpc。

1)相比uwsgi,使用rpc的长连接可以不需要频繁创建连接,提高传输效率。

2)rpc支持同步和异步,对于不需要等待返回的消息可以不等待返回继续运行,减少客户端等待时间。

3)使用rpc入口是我们自己定义的,可以根据不同消息类型定制不同的策略。

 

二、设计思路

使用统一入口,采用django的url resolve匹配,然后完成调用,不改变django rest接口的开发模式。

服务端处理采用同步异步分离,异步任务用单独的进程处理,并为异步任务制定处理策略:

1)对于同步任务,仍然需要立即调用返回。

2)对于异步任务,可以进行任务分级:

      一级是重要任务,属于系统能力不足时必须优先保障的;

      二级任务,在系统能力足够时仍然需要执行,一旦能力不足,优先保障一级任务;

3)对异步任务,制定执行策略:

      一是必须执行的任务,这部分任务即使积压也有一条条全部执行完成;

      二是只需要执行最后一条的,常见于更新信息,对于积压多条的同一消息,丢弃前面的,保留最后一条;

      三是可丢弃的,遇到性能不足,这一类消息不执行,直接丢弃。

 

三、 grpc的proto文件

syntax = "proto3";
package rpc;
service RPCServer {
  rpc handel(Input) returns (Output){}
}

message Input {
  string params = 1;
}

message Output {
  string content = 1;
}

入参为Input,返回为Output,所有接口调用都走这边。

 

四、客户端调用

import grpc
import time
import json
import traceback
import threading
import uuid
from datetime import datetime

from . import data_pb2, data_pb2_grpc

_HOST = ''
_PORT = ''
CHANNEL = grpc.insecure_channel(_HOST + ':' + _PORT)


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


def call_rpc(url, headers, resource, content, logger):
    try:
        params = json.dumps({
            'url': url,
            'headers': headers,
            'method': resource['method'],
            'content': content
        })
        timeout = resource.get('timeout', 5)
        client = data_pb2_grpc.RPCServerStub(CHANNEL)
        response = client.handel.future(data_pb2.Input(params=params), timeout)
        while not response.done():
            time.sleep(0.01)
        result = json.loads(response.result().content)
        print(result['status_code'])
        return result['status_code'], mano_encode(result['data'])
    except Exception as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (url, err.message))
        return '409', err.message

入参params需包含:rest url,头信息headers,rest类型,以及request body;

结果采用异步获取,不持续占用连接,对于不需要结果的,可以不等待,这边没写。

 

五、服务端实现

import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "*.settings")
django.setup()

import grpc
import json
import time
import random
import traceback
import threading
import uuid
import logging
from datetime import datetime

from concurrent import futures
from multiprocessing import Process, Queue, Value
from Queue import Queue as ManoQueue

from . import data_pb2, data_pb2_grpc
from django.urls import get_resolver
from django.utils.functional import cached_property

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = '[::]'
_PORT = '12330'
_PROCESS_COUNT = 2
RESOLVER = get_resolver()
logger = logging.getLogger(__name__)

message_queue = Queue()  # 异步任务队列,用于进程通信
status_level2 = Value('I', 1)  # 二级队列状态,用于进程通信


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


class RPCServer(data_pb2_grpc.RPCServerServicer):
    def handel(self, request, context):
        input_info = json.loads(request.params)
        if input_info.get('reply', True) is True:  # reply为True代表同步,否则异步
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            status_code, data = self.call_sync(res_url, headers, method, content)
            return data_pb2.Output(cOntent=mano_encode({'data': data, 'status_code': status_code}))
        else:
            if input_info['queue_detail']['level'] == 2 and not status_level2:
                data = 'queue of status level2 is not active'
                status_code = '409'
            else:
                message_queue.put(request.params)
                data = 'success'
                status_code = '201'
            return data_pb2.Output(cOntent=mano_encode({'data': data, 'status_code': status_code}))

    @staticmethod
    def call_sync(res_url, headers, method, content):
        try:
            resp_status, resp_body = call_inner(res_url, headers, method, content, logger)
            return resp_status, resp_body
        except Exception as err:
            logger.error(traceback.format_exc())
            logger.error('call url %s failed, msg is %s' % (res_url, err.message))
            return '409', err.message


def main():  # rpc 服务主进程
    bind_address = '%s:%s' % (_HOST, _PORT)
    _run_server(bind_address)  # 启动rpc进程
    _run_queue_process()  # 启动异步任务处理进程


def _run_server(bind_address):
    grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100, ))
    data_pb2_grpc.add_RPCServerServicer_to_server(RPCServer(), grpc_server)
    grpc_server.add_insecure_port(bind_address)
    grpc_server.start()


def _run_queue_process():
    worker = Process(target=_handle_no_wait_request, args=(message_queue, status_level2,))
    worker.start()
    worker.join()


def _handle_no_wait_request(q, status_2):  # 异步任务分类
    first_order_queue = [ManoQueue(maxsize=0), list()]
    second_order_queue = [ManoQueue(maxsize=1000), list()]
    mano_queue = [first_order_queue, second_order_queue]
    thread_pool = futures.ThreadPoolExecutor(max_workers=50)
    threading.Thread(target=_start_message_monitor, args=(q, mano_queue, status_2,)).start()  # 根据策略进行异步任务分类
    while True:
        num_threads = len(thread_pool._threads)
        if num_threads <50:
            input_info = _get_request(mano_queue)  # 获取本次需执行的任务,每个队列机会均等
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            thread_pool.submit(RPCServer.call_sync, res_url, headers, method, content)  # 交给工作线程
            logger.info('handle success')
        else:
            logger.info('process busy')
            time.sleep(0.1)


def _start_message_monitor(q, mano_queue, status_2):
    while True:
        data = q.get()
        _handel_by_queue(data, mano_queue, status_2)


def _get_request(mano_queue):
    active_index = _get_active_queue(mano_queue)
    if active_index:
        index = random.choice(active_index)
        i, k = int(index.split('_')[0]), int(index.split('_')[1])
        q = mano_queue[i][k]
        if isinstance(q, ManoQueue):
            request_info = json.loads(q.get())
        else:
            request_info = json.loads(q.pop(0))
    else:
        request_info = {}
    return request_info


def _get_active_queue(mano_queue):
    active_index = []
    if not mano_queue[0][0].empty():
        active_index.append('0_0')
    if not mano_queue[1][0].empty():
        active_index.append('1_0')
    if len(mano_queue[0][1]) != 0:
        active_index.append('0_1')
    if len(mano_queue[1][1]) != 0:
        active_index.append('1_1')
    return active_index


def _handel_by_queue(data, mano_queue, status_2):  # 根据请求级别进行消息分类
    input_info = json.loads(data)
    level = input_info['queue_detail']['level']
    policy = input_info['queue_detail']['limit_policy']
    if level == 1:
        _handel_by_policy(mano_queue[0], policy, data)
    elif level == 2:
        request_queue = mano_queue[1]
        _handel_by_policy(mano_queue[1], policy, data)
        if request_queue[0].qsize() > 0.8 * request_queue[0].maxsize:
            status_2.value = 0
        elif request_queue[0].qsize() <0.6 * request_queue[0].maxsize:
            status_2.value = 1


def _handel_by_policy(request_queue, policy, data):  # 根据请求策略进行消息分类
    if policy == 'execute':  # 必须执行的异步任务
        request_queue[0].put(data)
    elif policy == 'last':  # 阻塞时可以只执行最后一次的异步任务
        try:
            while True:
                request_queue[1].remove(data)
        except ValueError:
            request_queue[1].append(data)
    else:  # 阻塞时可以丢弃的异步任务
        if request_queue[0].qsize :
            request_queue[0].put(data)  # 先丢弃前面的


def call_inner(res_url, headers, method, content, logger):
    logger.info('[call_inner] url is %s' % res_url)
    url, params = get_url_and_params(res_url)
    meta = get_meta(headers)
    request = Request(url=url, full_url=res_url, params=params, cOntent=content, meta=meta, method=method)
    resolver_match = RESOLVER.resolve(url)  # URL 匹配
    callback, callback_args, callback_kwargs = resolver_match
    call_method = getattr(callback.view_class(), method.lower())
    if not method:
        return '404', 'not support this operate'
    try:
        if callback_kwargs:
            result = call_method(request, '', **callback_kwargs)
        else:
            result = call_method(request)
    except BaseException as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (res_url, err.message))
        return '409', err.message
    return str(result.status_code), result.data


def get_url_and_params(full_url):
    params = {}
    if '?' in full_url:
        url, params_str = full_url.split('?')[0], full_url.split('?')[1]
        for key_value in params_str.split('&'):
            key, value = key_value.split('=')[0], key_value.split('=')[1]
            params[key] = value
    else:
        url = full_url
    return url, params


def get_meta(headers):
    meta = {}
    # custom
    return meta


class Request(object):
    def __init__(self, **kwargs):
        self.data = self.get_content(kwargs['content'])
        self.query_params = kwargs['params']
        self.path = kwargs['url']
        self.full_path = kwargs['full_url']
        self.FILES = {}
        self.META = kwargs['meta']
        self.COOKIES = {}
        self._request = InnerOBJ(kwargs['method'])

    @staticmethod
    def get_content(content):
        if not content:
            req_data = {}
        else:
            req_data = content if isinstance(content, dict) else json.loads(content)
        return req_data

    def __str__(self):
        return ' %s' % self.path

    @cached_property
    def GET(self):
        return self.query_params

    def get_full_path(self):
        return self.full_path


class InnerOBJ(object):
    def __init__(self, method):
        self.method = method.upper()


if __name__ == '__main__':
    main()

 


推荐阅读
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 一、Tomcat安装后本身提供了一个server,端口配置默认是8080,对应目录为:..\Tomcat8.0\webapps二、Tomcat8.0配置多个端口,其实也就是给T ... [详细]
  • 为什么多数程序员难以成为架构师?
    探讨80%的程序员为何难以晋升为架构师,涉及技术深度、经验积累和综合能力等方面。本文将详细解析Tomcat的配置和服务组件,帮助读者理解其内部机制。 ... [详细]
  • Spring 切面配置中的切点表达式详解
    本文介绍了如何在Spring框架中使用AspectJ风格的切面配置,详细解释了切点表达式的语法和常见示例,帮助开发者更好地理解和应用Spring AOP。 ... [详细]
  • 1.创建目录mkdir-phomerocketmqnamesvr1data&&mkdir-phomerocketmqnamesvr1log&&mkdir-phomerocketm ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • 5分钟学会 gRPC
    5分钟学会gRPC-介绍我猜测大部分长期使用Java的开发者应该较少会接触gRPC,毕竟在Java圈子里大部分使用的还是DubboSpringClound这两类服务框架。我也是 ... [详细]
  • 交换机配置:intg100unshintvlani1ipadd192.168.56.177qstelseuser-iv4authaaaproinsshupl3qsshuserpyt ... [详细]
  • 本文详细介绍了如何在Linux系统(以CentOS为例)上彻底卸载Zimbra邮件系统,包括停止服务、删除文件和用户等步骤。 ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 如何在MySQL中选择合适的表空间以优化性能和管理效率
    在MySQL中,合理选择表空间对于提升表的管理和访问性能至关重要。表空间作为MySQL中用于组织和管理数据的一种机制,能够显著影响数据库的运行效率和维护便利性。通过科学地配置和使用表空间,可以优化存储结构,提高查询速度,简化数据管理流程,从而全面提升系统的整体性能。 ... [详细]
  • 探讨 jBPM 数据库表结构设计的精要与实践
    探讨 jBPM 数据库表结构设计的精要与实践 ... [详细]
  • restful是这些年的高频词汇了,各大互联网公司也都纷纷推出了自己的restfulapi,其实restful和thrift,grpc类似,就是一种协议,但是这种协议有点特殊的就是 ... [详细]
author-avatar
哒哒愛嘬萌
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有