热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

搞定thrift双向消息

thrift作为脱胎于facebook的rpc框架,各方面都非常优秀。清晰的分层设计,多语言的支持,以及不输protocolbuffer的效率(compact下优于protocol

thrift作为脱胎于facebook的rpc框架,各方面都非常优秀。清晰的分层设计,多语言的支持,以及不输protocolbuffer的效率(compact下优于protocolbuffer),都让thrift拥有越来越多的使用者。

作为一个RPC框架,thrift支持的是open->client--rpc-->server->close的短连接模式。在实际应用中,却经常会有客户端建立连接后,等待服务端数据的长连接模式,也可以称为双向连接。通常的方案有三种,可参考http://dongxicheng.org/search-engine/thrift-bidirectional-async-rpc/,文中提到第三种方法会修改源码,而实际操作过程中发现这其实是作者小小的理解错误,实现thrift双向通信并没有这么复杂,经过一番实验,发现只需要如下理解和实现即可轻松实现一个thrift的双向连接。

  1. 双向连接的service必须为oneway,否则会因为recv函数抛出remote close异常
  2. 客户端重用建立client的protocol,开线程使用processor.Process(protocol,protocol)监听服务端callback的消息。
  3. 服务端使用ProcessorFactory,使用TConnectionInfo中的transport作为向客户端发送消息的client的transport

搞定以上三步,即可实现一个thrift双向连接,这里附上实验代码,客户端使用C#(sorry for my pool C#),服务端使用C++

thrift

service HandshakeService{
    oneway void HandShake();
}

service CallbackService{
    oneway void Push(1: string msg); 
}

client

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Thrift.Collections;
using Thrift.Protocol;
using Thrift.Server;
using Thrift.Transport;
using System.Threading;
using Thrift;
using System.IO;

namespace ThriftBidirection
{
    class Program
    {
        class CallbackServiceImply : CallbackService.Iface
        {
            int msgCount = 0;
            public void Push(string msg)
            {
                Console.WriteLine("receive msg {0}: {1}", msgCount++, msg);
            }
        }
        //服务处理线程
        static void ProcessThread(TProtocol protocol)
        {
            TProcessor processor = new CallbackService.Processor(new CallbackServiceImply());
            while (true)
            {
                try
                {
                    //////////////////////////////////////////////////////////////////////////
                    ///模仿server行为,同时重用client端protocol
                    ///相当于同时重用一个连接
                    while (processor.Process(protocol, protocol)) { };
                    ///connection lost, return
                    return;
                }
                catch (IOException) //not fatal error, resume
                {
                    continue;
                }
                catch (TException) //fatal error
                {
                    return;
                }
            }
        }
        //服务器状态监听线程
        static void MonitorThread(TTransport trans, Action<string> callback)
        {
            while (true)
            {
                try
                {
                    if (!trans.Peek())
                    {
                        callback("连接中断");
                    }
                    Thread.Sleep(3000);
                }
                catch (Thrift.TException ex)
                {
                    callback(ex.Message);
                    return;
                }
            }
        }

        static void Main(string[] args)
        {
            TTransport transport = new TBufferedTransport(new TSocket("localhost", 5555));
            TProtocol protocol = new TBinaryProtocol(transport);
            HandshakeService.Client client = new HandshakeService.Client(protocol);
            Action processAction = new Action(ProcessThread);
            Actionstring>> mOnitorAction= new Actionstring>>(MonitorThread);

            transport.Open();
            processAction.BeginInvoke(protocol, (result) =>
            {
                 processAction.EndInvoke(result);
            }, null);
            monitorAction.BeginInvoke(transport, (msg) =>
            {
                Console.WriteLine("连接中断: {0}", msg);
            }, (result) =>
            {

            }, null);

            for (int i = 0; i <100; ++i)
            {
                client.HandShake();
                Thread.Sleep(10);
            }
            Console.Read();
            transport.Close();
        }
    }
}

server

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "HandshakeService.h"
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "CallbackService.h"

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace apache::thrift::concurrency;

using boost::make_shared;
using boost::shared_ptr;

class HandshakeServiceHandler : virtual public HandshakeServiceIf {
 public:
  HandshakeServiceHandler(const boost::shared_ptr &trans) 
      : m_client(make_shared(trans))
  {
      boost::once_flag flag = BOOST_ONCE_INIT;
      m_flag = flag;
  }

  virtual ~HandshakeServiceHandler()
  {
        m_thread->interrupt();
        m_thread->join();
  }

  void CallbackThread()
  {
      while(true)
      {
          try
          {
              m_client.Push("server push msg");
          }
          catch (TException)
          {
              return;
          }
          boost::this_thread::sleep_for(boost::chrono::milliseconds(20));
      }
  }

  void HandShake() {
    // Your implementation goes here
    printf("HandShake\n");
    boost::call_once(boost::bind(&HandshakeServiceHandler::_StartThread, this), m_flag);
  }

  void _StartThread()
  {
    m_thread.reset(new boost::thread(boost::bind(&HandshakeServiceHandler::CallbackThread, this)));
  }

boost::shared_ptr m_trans;
CallbackServiceClient m_client;
shared_ptr m_thread;
boost::once_flag m_flag;
};

class ProcessorFactoryImply : public TProcessorFactory
{
    virtual boost::shared_ptr getProcessor(
        const TConnectionInfo& connInfo)
    {
        return make_shared(make_shared(connInfo.transport));
    }
};


int main(int argc, char **argv) {
  int port = 5555;
  shared_ptr processorFactory(new ProcessorFactoryImply());
  shared_ptr serverTransport(new TServerSocket(port));
  shared_ptr transportFactory(new TBufferedTransportFactory());
  shared_ptr protocolFactory(new TBinaryProtocolFactory());
  shared_ptr threadMgr = ThreadManager::newSimpleThreadManager(30);
  boost::shared_ptr threadFactory =
      boost::shared_ptr(new PlatformThreadFactory());

  threadMgr->threadFactory(threadFactory);
  threadMgr->start();
  TThreadPoolServer server(processorFactory,serverTransport, transportFactory, protocolFactory, threadMgr);
  server.serve();
  return 0;
}

,

一个简单的thrift双向通信就实现了。

搞定thrift双向消息


推荐阅读
  • 解决 Windows Server 2016 网络连接问题
    本文详细介绍了如何解决 Windows Server 2016 在使用无线网络 (WLAN) 和有线网络 (以太网) 时遇到的连接问题。包括添加必要的功能和安装正确的驱动程序。 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • LDAP服务器配置与管理
    本文介绍如何通过安装和配置SSSD服务来统一管理用户账户信息,并实现其他系统的登录调用。通过图形化交互界面配置LDAP服务器,确保用户账户信息的集中管理和安全访问。 ... [详细]
  • 基于iSCSI的SQL Server 2012群集测试(一)SQL群集安装
    一、测试需求介绍与准备公司计划服务器迁移过程计划同时上线SQLServer2012,引入SQLServer2012群集提高高可用性,需要对SQLServ ... [详细]
  • 网络爬虫的规范与限制
    本文探讨了网络爬虫引发的问题及其解决方案,重点介绍了Robots协议的作用和使用方法,旨在为网络爬虫的合理使用提供指导。 ... [详细]
  • 自定义滚动条美化页面内容
    当页面内容超出显示范围时,为了提升用户体验和页面美观,通常会添加滚动条。如果默认的浏览器滚动条无法满足设计需求,我们可以自定义一个符合要求的滚动条。本文将详细介绍自定义滚动条的实现过程。 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • 解决Parallels Desktop错误15265的方法
    本文详细介绍了在使用Parallels Desktop时遇到错误15265的多种解决方案,包括检查网络连接、关闭代理服务器和修改主机文件等步骤。 ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
  • 本文详细介绍了一种利用 ESP8266 01S 模块构建 Web 服务器的成功实践方案。通过具体的代码示例和详细的步骤说明,帮助读者快速掌握该模块的使用方法。在疫情期间,作者重新审视并研究了这一未被充分利用的模块,最终成功实现了 Web 服务器的功能。本文不仅提供了完整的代码实现,还涵盖了调试过程中遇到的常见问题及其解决方法,为初学者提供了宝贵的参考。 ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • MicrosoftDeploymentToolkit2010部署培训实验手册V1.0目录实验环境说明3实验环境虚拟机使用信息3注意:4实验手册正文说 ... [详细]
  • 两个条件,组合控制#if($query_string~*modviewthread&t(&extra(.*)))?$)#{#set$itid$1;#rewrite^ ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
author-avatar
竹林映uj
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有