热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用django+rpc进行服务内部交互

一、为什么使用rpc。

1)相比uwsgi,使用rpc的长连接可以不需要频繁创建连接,提高传输效率。

2)rpc支持同步和异步,对于不需要等待返回的消息可以不等待返回继续运行,减少客户端等待时间。

3)使用rpc入口是我们自己定义的,可以根据不同消息类型定制不同的策略。

 

二、设计思路

使用统一入口,采用django的url resolve匹配,然后完成调用,不改变django rest接口的开发模式。

服务端处理采用同步异步分离,异步任务用单独的进程处理,并为异步任务制定处理策略:

1)对于同步任务,仍然需要立即调用返回。

2)对于异步任务,可以进行任务分级:

      一级是重要任务,属于系统能力不足时必须优先保障的;

      二级任务,在系统能力足够时仍然需要执行,一旦能力不足,优先保障一级任务;

3)对异步任务,制定执行策略:

      一是必须执行的任务,这部分任务即使积压也有一条条全部执行完成;

      二是只需要执行最后一条的,常见于更新信息,对于积压多条的同一消息,丢弃前面的,保留最后一条;

      三是可丢弃的,遇到性能不足,这一类消息不执行,直接丢弃。

 

三、 grpc的proto文件

syntax = "proto3";
package rpc;
service RPCServer {
  rpc handel(Input) returns (Output){}
}

message Input {
  string params = 1;
}

message Output {
  string content = 1;
}

入参为Input,返回为Output,所有接口调用都走这边。

 

四、客户端调用

import grpc
import time
import json
import traceback
import threading
import uuid
from datetime import datetime

from . import data_pb2, data_pb2_grpc

_HOST = ''
_PORT = ''
CHANNEL = grpc.insecure_channel(_HOST + ':' + _PORT)


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


def call_rpc(url, headers, resource, content, logger):
    try:
        params = json.dumps({
            'url': url,
            'headers': headers,
            'method': resource['method'],
            'content': content
        })
        timeout = resource.get('timeout', 5)
        client = data_pb2_grpc.RPCServerStub(CHANNEL)
        response = client.handel.future(data_pb2.Input(params=params), timeout)
        while not response.done():
            time.sleep(0.01)
        result = json.loads(response.result().content)
        print(result['status_code'])
        return result['status_code'], mano_encode(result['data'])
    except Exception as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (url, err.message))
        return '409', err.message

入参params需包含:rest url,头信息headers,rest类型,以及request body;

结果采用异步获取,不持续占用连接,对于不需要结果的,可以不等待,这边没写。

 

五、服务端实现

import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "*.settings")
django.setup()

import grpc
import json
import time
import random
import traceback
import threading
import uuid
import logging
from datetime import datetime

from concurrent import futures
from multiprocessing import Process, Queue, Value
from Queue import Queue as ManoQueue

from . import data_pb2, data_pb2_grpc
from django.urls import get_resolver
from django.utils.functional import cached_property

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = '[::]'
_PORT = '12330'
_PROCESS_COUNT = 2
RESOLVER = get_resolver()
logger = logging.getLogger(__name__)

message_queue = Queue()  # 异步任务队列,用于进程通信
status_level2 = Value('I', 1)  # 二级队列状态,用于进程通信


class ManoEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return str(obj)
        if isinstance(obj, uuid.UUID):
            return str(obj)
        return json.JSONEncoder.default(self, obj)


def mano_encode(data):
    return json.dumps(data, cls=ManoEncoder)


class RPCServer(data_pb2_grpc.RPCServerServicer):
    def handel(self, request, context):
        input_info = json.loads(request.params)
        if input_info.get('reply', True) is True:  # reply为True代表同步,否则异步
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            status_code, data = self.call_sync(res_url, headers, method, content)
            return data_pb2.Output(cOntent=mano_encode({'data': data, 'status_code': status_code}))
        else:
            if input_info['queue_detail']['level'] == 2 and not status_level2:
                data = 'queue of status level2 is not active'
                status_code = '409'
            else:
                message_queue.put(request.params)
                data = 'success'
                status_code = '201'
            return data_pb2.Output(cOntent=mano_encode({'data': data, 'status_code': status_code}))

    @staticmethod
    def call_sync(res_url, headers, method, content):
        try:
            resp_status, resp_body = call_inner(res_url, headers, method, content, logger)
            return resp_status, resp_body
        except Exception as err:
            logger.error(traceback.format_exc())
            logger.error('call url %s failed, msg is %s' % (res_url, err.message))
            return '409', err.message


def main():  # rpc 服务主进程
    bind_address = '%s:%s' % (_HOST, _PORT)
    _run_server(bind_address)  # 启动rpc进程
    _run_queue_process()  # 启动异步任务处理进程


def _run_server(bind_address):
    grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100, ))
    data_pb2_grpc.add_RPCServerServicer_to_server(RPCServer(), grpc_server)
    grpc_server.add_insecure_port(bind_address)
    grpc_server.start()


def _run_queue_process():
    worker = Process(target=_handle_no_wait_request, args=(message_queue, status_level2,))
    worker.start()
    worker.join()


def _handle_no_wait_request(q, status_2):  # 异步任务分类
    first_order_queue = [ManoQueue(maxsize=0), list()]
    second_order_queue = [ManoQueue(maxsize=1000), list()]
    mano_queue = [first_order_queue, second_order_queue]
    thread_pool = futures.ThreadPoolExecutor(max_workers=50)
    threading.Thread(target=_start_message_monitor, args=(q, mano_queue, status_2,)).start()  # 根据策略进行异步任务分类
    while True:
        num_threads = len(thread_pool._threads)
        if num_threads <50:
            input_info = _get_request(mano_queue)  # 获取本次需执行的任务,每个队列机会均等
            res_url = input_info['url']
            headers = input_info['headers']
            method = input_info['method']
            content = input_info['content']
            thread_pool.submit(RPCServer.call_sync, res_url, headers, method, content)  # 交给工作线程
            logger.info('handle success')
        else:
            logger.info('process busy')
            time.sleep(0.1)


def _start_message_monitor(q, mano_queue, status_2):
    while True:
        data = q.get()
        _handel_by_queue(data, mano_queue, status_2)


def _get_request(mano_queue):
    active_index = _get_active_queue(mano_queue)
    if active_index:
        index = random.choice(active_index)
        i, k = int(index.split('_')[0]), int(index.split('_')[1])
        q = mano_queue[i][k]
        if isinstance(q, ManoQueue):
            request_info = json.loads(q.get())
        else:
            request_info = json.loads(q.pop(0))
    else:
        request_info = {}
    return request_info


def _get_active_queue(mano_queue):
    active_index = []
    if not mano_queue[0][0].empty():
        active_index.append('0_0')
    if not mano_queue[1][0].empty():
        active_index.append('1_0')
    if len(mano_queue[0][1]) != 0:
        active_index.append('0_1')
    if len(mano_queue[1][1]) != 0:
        active_index.append('1_1')
    return active_index


def _handel_by_queue(data, mano_queue, status_2):  # 根据请求级别进行消息分类
    input_info = json.loads(data)
    level = input_info['queue_detail']['level']
    policy = input_info['queue_detail']['limit_policy']
    if level == 1:
        _handel_by_policy(mano_queue[0], policy, data)
    elif level == 2:
        request_queue = mano_queue[1]
        _handel_by_policy(mano_queue[1], policy, data)
        if request_queue[0].qsize() > 0.8 * request_queue[0].maxsize:
            status_2.value = 0
        elif request_queue[0].qsize() <0.6 * request_queue[0].maxsize:
            status_2.value = 1


def _handel_by_policy(request_queue, policy, data):  # 根据请求策略进行消息分类
    if policy == 'execute':  # 必须执行的异步任务
        request_queue[0].put(data)
    elif policy == 'last':  # 阻塞时可以只执行最后一次的异步任务
        try:
            while True:
                request_queue[1].remove(data)
        except ValueError:
            request_queue[1].append(data)
    else:  # 阻塞时可以丢弃的异步任务
        if request_queue[0].qsize :
            request_queue[0].put(data)  # 先丢弃前面的


def call_inner(res_url, headers, method, content, logger):
    logger.info('[call_inner] url is %s' % res_url)
    url, params = get_url_and_params(res_url)
    meta = get_meta(headers)
    request = Request(url=url, full_url=res_url, params=params, cOntent=content, meta=meta, method=method)
    resolver_match = RESOLVER.resolve(url)  # URL 匹配
    callback, callback_args, callback_kwargs = resolver_match
    call_method = getattr(callback.view_class(), method.lower())
    if not method:
        return '404', 'not support this operate'
    try:
        if callback_kwargs:
            result = call_method(request, '', **callback_kwargs)
        else:
            result = call_method(request)
    except BaseException as err:
        logger.error(traceback.format_exc())
        logger.error('call url %s failed, msg is %s' % (res_url, err.message))
        return '409', err.message
    return str(result.status_code), result.data


def get_url_and_params(full_url):
    params = {}
    if '?' in full_url:
        url, params_str = full_url.split('?')[0], full_url.split('?')[1]
        for key_value in params_str.split('&'):
            key, value = key_value.split('=')[0], key_value.split('=')[1]
            params[key] = value
    else:
        url = full_url
    return url, params


def get_meta(headers):
    meta = {}
    # custom
    return meta


class Request(object):
    def __init__(self, **kwargs):
        self.data = self.get_content(kwargs['content'])
        self.query_params = kwargs['params']
        self.path = kwargs['url']
        self.full_path = kwargs['full_url']
        self.FILES = {}
        self.META = kwargs['meta']
        self.COOKIES = {}
        self._request = InnerOBJ(kwargs['method'])

    @staticmethod
    def get_content(content):
        if not content:
            req_data = {}
        else:
            req_data = content if isinstance(content, dict) else json.loads(content)
        return req_data

    def __str__(self):
        return ' %s' % self.path

    @cached_property
    def GET(self):
        return self.query_params

    def get_full_path(self):
        return self.full_path


class InnerOBJ(object):
    def __init__(self, method):
        self.method = method.upper()


if __name__ == '__main__':
    main()

 


推荐阅读
  • 如何查询zone下的表的信息
    本文介绍了如何通过TcaplusDB知识库查询zone下的表的信息。包括请求地址、GET请求参数说明、返回参数说明等内容。通过curl方法发起请求,并提供了请求示例。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • Spring Boot 中 Java8 LocalDateTime 序列化问题
    LoginController页面如下:publicObjectlogin(@RequestBodyUseruser){returnxxxx ... [详细]
  • Abp+MongoDb改造默认的审计日志存储位置
    一、背景在实际项目的开发当中,使用AbpZero自带的审计日志功能写入效率比较低。其次审计日志数据量中后期十分庞大,不适合与业务数据存放在一起。所以我们可以重新实现A ... [详细]
  • 阿里云监控URL的配置笔记
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了阿里云监控URL的配置笔记相关的知识,希望对你有一定的参考价值。有很多细节需要记录 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了如何将CIM_DateTime解析为.Net DateTime,并分享了解析过程中可能遇到的问题和解决方法。通过使用DateTime.ParseExact方法和适当的格式字符串,可以成功解析CIM_DateTime字符串。同时还提供了关于WMI和字符串格式的相关信息。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 介绍平常在多线程开发中,总避免不了线程同步。本篇就对net多线程中的锁系统做个简单描述。目录一:lock、Monitor1:基础 ... [详细]
  • 在这分布式系统架构盛行的时代,很多互联网大佬公司开源出自己的分布式RPC系统框架,例如:阿里的dubbo,谷歌的gRPC,apache的Thrift。而在我们公司一直都在推荐使用d ... [详细]
  • 基于.NET Core框架nacos的简单应用
    什么是Nacos?服务(Service)是Nacos世界的一等公民。Nacos支持 ... [详细]
author-avatar
哒哒愛嘬萌
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有