最近研究Thrift了一段时间,对Thrift的server端和client端操作的代码进行了稍微封装,这样编写一个后台服务器的时候,就专心写业务逻辑就行了,对thrift内部服务器代码不用关心,因为这些代码都是通用的。如果你不知道thrift是什么东东,我之前有写一个简单
最近研究Thrift了一段时间,对Thrift的server端和client端操作的代码进行了稍微封装,这样编写一个后台服务器的时候,就专心写业务逻辑就行了,对thrift内部服务器代码不用关心,因为这些代码都是通用的。
如果你不知道thrift是什么东东,我之前有写一个简单示例:thrift入门hello示例
我同时写了几个语言版本的服务器,有php、python和java,并作了相应测试。client端用的php。
作为测试,我以thrift的IDL格式写了一个简单的服务接口 mongotest.thrift:
namespace php mongotest
namespace py mongotest
service MongoTest {
string getServerStatus();
}
这个服务功能就是查看mongodb数据库的连接状态,这样就可以测试这种开发方式连接数据库的一些行为。
下面帖出封装的server类,并写出生成一个服务端实例的代码。
java版server,ThriftServer.java
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServer.Args;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TCompactProtocol;
public class ThriftServer {
public org.apache.thrift.TProcessor processor;
public ThriftServer(org.apache.thrift.TProcessor processor){
this.processor = processor;
}
public void startServer() {
try {
//这里可以选择TThreadPoolServer 和 TThreadedSelectorServer
Runnable threadPool = new Runnable() {
public void run() {
threadPool(processor);
}
};
//Runnable secure = new Runnable() {
//public void run() {
//secure(processor);
//}
//};
new Thread(threadPool).start();
//new Thread(secure).start();
} catch (Exception x) {
x.printStackTrace();
}
}
public void threadPool(org.apache.thrift.TProcessor processor) {
try {
TServerTransport serverTransport = new TServerSocket(9090);
//TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));
// Use this for a multithreaded server
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport).processor(processor);
args.maxWorkerThreads = 50;
TServer server = new TThreadPoolServer(args);
System.out.println("Starting the server...");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}
public void threadSelector(org.apache.thrift.TProcessor processor) {
try {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(9090);
//异步IO,需要使用TFramedTransport,它将分块缓存读取。
TTransportFactory transportFactory = new TFramedTransport.Factory();
//使用高密度二进制协议
TProtocolFactory proFactory = new TCompactProtocol.Factory();
TServer server = new TThreadedSelectorServer(
new TThreadedSelectorServer.Args(serverTransport)
.protocolFactory(proFactory)
.transportFactory(transportFactory)
.processor(processor)
);
System.out.println("Starting the server...");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}
public void secure(org.apache.thrift.TProcessor processor) {
try {
/*
* Use TSSLTransportParameters to setup the required SSL parameters. In this example
* we are setting the keystore and the keystore password. Other things like algorithms,
* cipher suites, client auth etc can be set.
*/
TSSLTransportParameters params = new TSSLTransportParameters();
// The Keystore contains the private key
params.setKeyStore("./lib/java/test/.keystore", "thrift", null, null);
/*
* Use any of the TSSLTransportFactory to get a server transport with the appropriate
* SSL configuration. You can use the default settings if properties are set in the command line.
* Ex: -Djavax.net.ssl.keyStore=.keystore and -Djavax.net.ssl.keyStorePassword=thrift
*
* Note: You need not explicitly call open(). The underlying server socket is bound on return
* from the factory class.
*/
TServerTransport serverTransport = TSSLTransportFactory.getServerSocket(9093, 0, null, params);
//TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));
// Use this for a multi threaded server
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport).processor(processor);
args.maxWorkerThreads = 50;
TServer server = new TThreadPoolServer(args);
System.out.println("Starting the secure server...");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}
}
有了上面封装的类后,要写一个基于thrift相应服务的socket后台,就相当简单了,下面是服务器调用的例子 JavaServer.java:
import mongotest.*;
public class JavaServer {
public static void main(String[] args){
MongoTestHandler handler = new MongoTestHandler(); //之前定义的接口mongotest.thrift,具体实现类
MongoTest.Processor processor = new MongoTest.Processor(handler); //这个调用的是通过thrift自动生成的类
ThriftServer server = new ThriftServer(processor);
server.startServer(); //这里也可以把服务器的一些参数,如端口等封装进去
}
}
python版server,ThriftServer.py:
#!/usr/bin/env python
#encoding=utf-8
import sys
sys.path.append('./gen-py')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
class ThriftServer:
def __init__(self, processor, port=9090):
self.processor = processor
self.port = port
def startServer(self):
processor = self.processor
transport = TSocket.TServerSocket(port=self.port)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
#server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
# You could do one of these for a multithreaded server
#server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)
server.daemon = True #enable ctrl+c to exit the server
server.setNumThreads(100);
#server = TServer.TForkingServer(processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()
print 'done.'
python版调用示例:
#!/usr/bin/env python
#encoding=utf-8
import sys
sys.path.append('./gen-py')
from ThriftServer import ThriftServer
from MongoTestHandler import MongoTestHandler
from mongotest import MongoTest
handler = MongoTestHandler()
processor = MongoTest.Processor(handler)
server = ThriftServer(processor, 9090)
server.startServer()
php版server,ThriftServer.php:
error_reporting(E_ALL);
require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';
use Thrift\ClassLoader\ThriftClassLoader;
$GEN_DIR = realpath(dirname(__FILE__).'').'/gen-php';
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
$loader->register();
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TPhpStream;
use Thrift\Transport\TBufferedTransport;
use Thrift\Server\TServerSocket;
use Thrift\Server\TForkingServer;
use Thrift\Factory\TTransportFactory;
use Thrift\Factory\TBinaryProtocolFactory;
class ThriftServer {
private $processor;
private $server_ip;
private $server_port;
function __construct($processor, $server_ip="localhost", $server_port=9090){
$this->processor = $processor;
$this->server_ip = $server_ip;
$this->server_port = $server_port;
}
function startServer(){
$processor = $this->processor;
try {
$transport = new TServerSocket($this->server_ip, $this->server_port);
} catch (Exception $e) {
echo 'port already in use.';
exit();
}
$outputTransportFactory = $inputTransportFactory = new TTransportFactory($transport);
$outputProtocolFactory = $inputProtocolFactory = new TBinaryProtocolFactory();
$server = new TForkingServer(
$processor,
$transport,
$inputTransportFactory,
$outputTransportFactory,
$inputProtocolFactory,
$outputProtocolFactory
);
header('Content-Type: application/x-thrift');
print 'Starting the server...';
$server->serve();
}
}
php调用示例,PhpServer.php:
include_once __DIR__.'/gen-php/mongotest/MongoTest.php';
include_once __DIR__.'/MongoTestHandler.php';
include_once __DIR__.'/ThriftServer.php';
error_reporting(E_ALL);
$handler = new MongoTestHandler();
$processor = new \mongotest\MongoTestProcessor($handler);
$server = new ThriftServer($processor);
$server->startServer();
对于客户端,只做了php一个版本,其它语言也很容易,下面是php版client封装类,ThriftClient.php:
error_reporting(E_ALL);
require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';
use Thrift\ClassLoader\ThriftClassLoader;
$GEN_DIR = realpath(dirname(__FILE__)).'/gen-php';
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
$loader->register();
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\THttpClient;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;
class ThriftClient {
public $client;
private $transport;
private $protocol;
private $client_class;
private $server_ip;
private $server_port;
private $socket;
function __construct($client_class, $server_ip='localhost', $server_port=9090){
$this->client_class = $client_class;
$this->server_ip = $server_ip;
$this->server_port = $server_port;
}
function __destruct(){
$this->transport->close();
}
public function getClient(){
$this->socket = new TSocket($this->server_ip, $this->server_port);
$this->transport = new TBufferedTransport($this->socket, 1024, 1024);
$this->protocol = new TBinaryProtocol($this->transport);
$this->client = new \mongotest\MongoTestClient($this->protocol);
$this->transport->open();
return $this->client;
}
}
?>
php版client调用示例,MongoTest-client-thrift.php:
include_once __DIR__.'/gen-php/mongotest/MongoTest.php';
include_once __DIR__.'/ThriftClient.php';
$thrift = new ThriftClient('\mongotest\MongoTestClient', 'localhost', 9090);
$client = $thrift->getClient();
$ret = $client->getServerStatus(); //调用服务方法
echo $ret;
?>
相关代码打包(省略了thrift的库和通过thrift命令自动生成的代码,及一些jar依赖包,如mongodb包)