设为首页收藏本站

全球主机交流论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

搜索
热搜: discuz
查看: 1157|回复: 0
打印 上一主题 下一主题

初探Thrift客户端异步模式

[复制链接]

该用户从未签到

跳转到指定楼层
楼主
发表于 2014-1-13 13:48:04 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
背景
在某项目中,我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。

分析

thrift有提供一套异步模式供我们使用,首先我们像往常一样编写thrift协议文件。


namespace cpp uctest
service Test{
    string pingpong(1: string data);
}

不同的是,需要加入cpp:cob_type来生成代码。生成的代码文件外表与之前的基本相同,但在Test.h和Test.cpp中内涵就丰富了,增加了异步客户端和异步服务器使用的类。
异步客户端代码有TestCobClient以及它继承的虚拟类。

class TestCobClient : virtual public TestCobClIf {
public:
  TestCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
    channel_(channel),
    itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
    piprot_(protocolFactory->getProtocol(itrans_)),
    poprot_(protocolFactory->getProtocol(otrans_)) {
    iprot_ = piprot_.get();
    oprot_ = poprot_.get();
  }
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
    return channel_;
  }
  virtual void completed__(bool /* success */) {}
  void pingpong(std::tr1::function<void(TestCobClient* client)> cob, const std::string& data);
  void send_pingpong(const std::string& data);
  void recv_pingpong(std::string& _return);
protected:
  boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
  boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
  boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
  ::apache::thrift::protocol::TProtocol* iprot_;
  ::apache::thrift::protocol::TProtocol* oprot_;
};


从源文件上看,异步功能的核心在于TAsyncChannel,它是用于回调函数注册和异步收发数据。send_pingpong和recv_pingpong分别向缓冲区(TMemoryBuffer)写入和读取数据。而pingpong则通过调用TAsyncChannel的sendAndRecvMessage接口注册回调函数。

TAsyncChannel作为接口类定义了三个接口函数。

  /**
   * Send a message over the channel.
   */
  virtual void sendMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* message) = 0;
  /**
   * Receive a message from the channel.
   */
  virtual void recvMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* message) = 0;
  /**
   * Send a message over the channel and receive a response.
   */
  virtual void sendAndRecvMessage(const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf);

TAsyncChannel目前为止(0.9.1版本)只有一种客户端实现类TEvhttpClientChannel,顾名思义它是基于libevent和http协议实现的。 使用libevent的方法就不在这里累赘了,主要看下sendAndRecvMessage的实现。

void TEvhttpClientChannel::sendAndRecvMessage(
    const VoidCallback& cob,
    apache::thrift::transport::TMemoryBuffer* sendBuf,
    apache::thrift::transport::TMemoryBuffer* recvBuf) {
  cob_ = cob;
  recvBuf_ = recvBuf;
  struct evhttp_request* req = evhttp_request_new(response, this);
  uint8_t* obuf;
  uint32_t sz;
  sendBuf->getBuffer(&obuf, &sz);
  rv = evbuffer_add(req->output_buffer, obuf, sz);
  rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());
}


通过向evhttp_request中注册相应回调函数respones和传入回调实例本身的指针,在相应时候回调函数中调用TEvhttpClientChannel实例的finish接口完成数据接收,并写入缓存中,供应用层获取使用。

实验

有文章认为:使用Thrift异步客户端需要配合使用对应异步服务器才能工作。如果这个观点成立,我们改造目前程序代码的成本就会很高,而且可能会丧失使用Thrift的便捷性。
通过上述代码的阅读,发现唯一的局限性是服务器必须使用Http的传输层,此外只需要协议层保持一致,并不需要一定使用异步服务器。
下面我们通过简单代码基于上文“uctest”的协议实现了一个异步客户端和同步服务器。
服务器代码:

class TestHandler : virtual public TestIf {
public:
  TestHandler() { }
  void pingpong(std::string& _return, const std::string& data) {
    if(data=="ping")
      printf("[%d] recv ping\n", (int)time(NULL));
  _return = "pong";
  printf("[%d] send pong\n", (int)time(NULL));
  }
};
int main(int argc, char **argv) {
  int port = 9091;
  shared_ptr<TestHandler> handler(new TestHandler());
  shared_ptr<TProcessor> processor(new TestProcessor(handler));
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new THttpServerTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  server.serve();
  return 0;
}

客户端代码:

_ping_pong(TestCobClient* client)
{
    std::string pong;
    client->recv_pingpong(pong);
    printf("[%d] recv:%s\n", (int)time(NULL), pong.c_str());
}
int main(int argc, char **argv) {
  try{
      event_base* evbase = event_base_new();
      boost::shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel("127.0.0.1", "/", "127.0.0.1", 9091, evbase));
      TestCobClient client(channel, new TBinaryProtocolFactory());
      function<void(TestCobClient* client)> cob = bind(&my_ping_pong,_1);
      client.pingpong(cob, "ping");
      printf("[%d] ping\n", (int)time(NULL));
      for(int i=0;i<5;i++)
      {
        printf("[%d] running...\n", (int)time(NULL));
        sleep(1);
      }
      event_base_dispatch(evbase);
      event_base_free(evbase);
    }
    catch(...)
    {
      printf("exception");
      return 1;
    }
    return 0;
};


运行结果如下:
[tangzheng@dev10 server]$ ./demo.serv
[1388639886] recv ping
[1388639886] send pong
[tangzheng@dev10 client]$ ./demo.client
[1388639881] ping
[1388639881] running…
[1388639882] running…
[1388639883] running…
[1388639884] running…
[1388639885] running…
[1388639886] recv:pong
达到异步客户端预期的效果。

结果

初步掌握了thrift异步客户端的用法,我们即可在需要的时候使用,或者优化当前的程序。 由于这种提供的异步模式必须基于HTTP传输层,使用有一定的局限性。之后将会继续研究是否可以在TAsyncChannel的基础上,开发支持其他传输层的接口。



分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友 微信微信
收藏收藏
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|小黑屋|Archiver|手机版|中国U网    

GMT+8, 2024-5-2 09:46 , Processed in 0.075800 second(s), 23 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表