跳到主要内容

Thrift(二)网络服务模型

Thrift提供的网络服务模型单线程多线程事件驱动,从另一个角度划分为:阻塞服务模型非阻塞服务模型

  • 阻塞服务模型:TSimpleServer、TThreadPoolServer。
  • 非阻塞服务模型:TNonblockingServer、THsHaServer和TThreadedSelectorServer。

image.png

1.1 TServer

TServer定义了静态内部类Args,Args继承自抽象类AbstractServerArgs。AbstractServerArgs采用了建造者模式,向TServer提供各种工厂: TServer

public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}

public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}

protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;

protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}

public abstract void serve();
public void stop() {}

public boolean isServing() {
return isServing;
}

protected void setServing(boolean serving) {
isServing = serving;
}
}

TServer的三个方法:serve()、stop()和isServing()。

  • serve()用于启动服务
  • stop()用于关闭服务
  • isServing()用于检测服务的起停状态

TServer的不同实现类的启动方式不一样,因此serve()定义为抽象方法。不是所有的服务都需要优雅的退出, 因此stop()方法没有被定义为抽象。

1.1.1 TSimpleServer

TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket连接,效率比较低。它主要用于演示Thrift的工作过程,在实际开发过程中很少用到它。

  • 工作流程:

  • 关键方法
@Override
public void serve() {
try {
// 创建监听
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}

// Run the preServe event
if (eventHandler_ != null) {
eventHandler_.preServe();
}

setServing(true);

while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
//阻塞等待结果返回
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
processor.process(inputProtocol, outputProtocol);
}
}
} catch (TTransportException ttx) {
// Client died, just move on
LOGGER.debug("Client Transportation Exception", ttx);
} catch (TException tx) {
if (!stopped_) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
}
} catch (Exception x) {
if (!stopped_) {
LOGGER.error("Error occurred during processing of message.", x);
}
}

if (eventHandler_ != null) {
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
}

if (inputTransport != null) {
inputTransport.close();
}

if (outputTransport != null) {
outputTransport.close();
}
}
setServing(false);
}

serve()方法的操作:

  1. 设置TServerSocket的listen()方法启动连接监听
  2. 阻塞的方式接受客户端地连接请求,每进入一个连接即为其创建一个通道TTransport对象。
  3. 为客户端创建处理器对象输入传输通道对象输出传输通道对象输入协议对象输出协议对象
  4. 通过TServerEventHandler对象处理具体的业务请求。

1.1.2 ThreadPoolServer

TThreadPoolServer模式采用阻塞socket方式工作,主线程负责阻塞式监听是否有新socket到来,具体的业务处理交由一个线程池来处理。 ThreadPoolServer解决了TSimpleServer不支持并发多连接的问题,引入了线程池。实现的模型是One Thread Per Connection。查看上述流程的源代码

private static ExecutorService createDefaultExecutorService(Args args) {
return new ThreadPoolExecutor(
args.minWorkerThreads,
args.maxWorkerThreads,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactory() {
final AtomicLong count = new AtomicLong();

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(
String.format("TThreadPoolServer WorkerProcess-%d", count.getAndIncrement()));
return thread;
}
});
}
  • server 方法
@Override
public void serve() {
if (!preServe()) {
return;
}
execute();
executorService_.shutdownNow();
if (!waitForShutdown()) {
LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
}
setServing(false);
}


protected void execute() {
while (!stopped_) {
try {
TTransport client = serverTransport_.accept();
try {
executorService_.execute(new WorkerProcess(client));
} catch (RejectedExecutionException ree) {
if (!stopped_) {
LOGGER.warn(
"ThreadPool is saturated with incoming requests. Closing latest connection.");
}
client.close();
}
} catch (TTransportException ttx) {
if (!stopped_) {
LOGGER.warn("Transport error occurred during acceptance of message", ttx);
}
}
}
}


  1. 设置TServerSocket的listen()方法启动连接监听
  2. 阻塞的方式接受客户端连接请求,每进入一个连接,将通道对象封装成一个WorkerProcess对象(WorkerProcess实现了Runnabel接口),并提交到线程池
  3. WorkerProcess的run()方法负责业务处理,为客户端创建了处理器对象输入传输通道对象输出传输通道对象输入协议对象输出协议对象
  4. 通过TServerEventHandler对象处理具体的业务请求, 但本质没有改变
@Override
public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;

Optional<TServerEventHandler> eventHandler = Optional.empty();
ServerContext connectionContext = null;

try {
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);

eventHandler = Optional.ofNullable(getEventHandler());

if (eventHandler.isPresent()) {
connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol);
}

while (true) {
if (Thread.currentThread().isInterrupted()) {
LOGGER.debug("WorkerProcess requested to shutdown");
break;
}
if (eventHandler.isPresent()) {
eventHandler.get().processContext(connectionContext, inputTransport, outputTransport);
}
// This process cannot be interrupted by Interrupting the Thread. This
// will return once a message has been processed or the socket timeout
// has elapsed, at which point it will return and check the interrupt
// state of the thread.
processor.process(inputProtocol, outputProtocol);
}
} catch (Exception x) {
logException(x);
} finally {
if (eventHandler.isPresent()) {
eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
if (client_.isOpen()) {
client_.close();
}
}
}

TThreadPoolServer模式的优点 拆分了监听线程(Accept Thread)和处理客户端连接工作线程(Worker Thread),数据读取业务处理都交给线程池处理。因此在并发量较大时新连接也能够被及时接受。 线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。 TThreadPolServer模式的缺点 线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待

1.1.3 TNonblockingServer

TNonblockingServer模式也是单线程工作,但是采用NIO的模式,借助Channel/Selector机制, 采用IO事件模型来处理。 所有的socket都被注册到selector中,在一个线程中通过seletor循环监控所有的socket。 每次selector循环结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket则产生一个新业务socket并将其注册到selector上。

private void select() {
try {
// wait for io events.
selector.select();

// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}

// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}

TNonblockingServer模式优点 相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,对accept/read/write等IO事件进行监控处理,同时监控多个socket的状态变化。 TNonblockingServer模式缺点 TNonblockingServer模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

1.1.4 THsHaServer

鉴于TNonblockingServer的缺点,THsHaServer继承于TNonblockingServer,引入了线程池提高了任务处理的并发能力。THsHaServer是半同步半异步(Half-Sync/Half-Async)的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handler对rpc的同步处理上。注意:THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。 THsHaServer继承于TNonblockingServer,新增了线程池并发处理工作任务的功能

protected static ExecutorService createInvokerPool(Args options) {
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ExecutorService invoker =
new ThreadPoolExecutor(
minWorkerThreads, maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

return invoker;
}

THsHaServer的优点 THsHaServer与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。 THsHaServer的缺点 主线程仍然需要完成所有socket的监听接收数据读取数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。

1.1.5 TThreadedSelectorServer

TThreadedSelectorServer是对THsHaServer的一种扩充,它将selector中的读写IO事件(read/write)从主线程中分离出来。同时引入worker工作线程池,它也是种Half-Sync/Half-Async的服务模型。 TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它内部有如果几个部分构成:

  1. 一个AcceptThread线程对象,专门用于处理监听socket上的新连接。
  2. 若干个SelectorThread对象专门用于处理业务socket的网络I/O读写操作,所有网络数据的读写均是有这些线程来完成。
  3. 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程
  4. 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc请求的handler回调处理(这部分是同步的)。

  • 核心原理

以上工作流程的三个组件AcceptThread、SelectorThread和ExecutorService在源码中的定义如下: TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况。

// The thread handling all accepts
private AcceptThread acceptThread;

// Threads handling events on client transports
private final Set<SelectorThread> selectorThreads = new HashSet<>();

// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
  • 负责网络IO读写的selector默认线程数(selectorThreads):2
  • 负责业务处理的默认工作线程数(workerThreads):5
  • 工作线程池单个线程的任务队列大小(acceptQueueSizePerThread):4

创建、初始化并启动AcceptThread和SelectorThreads,同时启动selector线程的负载均衡器(selectorThreads)。

@Override
protected boolean startThreads() {
try {
for (int i = 0; i < args.selectorThreads; ++i) {
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
}
acceptThread =
new AcceptThread(
(TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
for (SelectorThread thread : selectorThreads) {
thread.start();
}
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}