跳到主要内容

一 生活场景

1.1 场景

  • 首先我们来看一个显示场景:在现实生活中有两个人,在进行一问一答形式的交流,了解病情等等

132641658_0_final.png

  • 语言: 如果两个人都是说的普通话,医生可以清楚的知道老人的病情,如果一个说方言,一个说普通话,那就很难搞哦,所以我们要有相同的语言,才能听懂。
  • 传输介质: 当然就是通过空气。
  • 交流方式:病人先说出自己的情况,医生给出自己的意见,这就是一个等待-循环的过程。
  • 思考:都是通过人的大脑进行语言表达。

二 信息格式

2.1 常见格式

很明显通过中文的交谈,两个人相互明白了对方的意图,在计算机领域为了保证信息能够被处理,信息也会被做成特定的格式,而且要确保目标能够明白这种格式。 image.png 常用的信息格式包括:

  • XML:可扩展标记语言,这个语言由W3C(万维网联盟)进行发布和维护。XML语言应用之广泛,扩展之丰富。适合做网络通信的信息描述格式(一般是“应用层”协议了)。例如Google 定义的XMPP通信协议就是使用XML进行描述的;不过XML的更广泛使用场景是对系统环境进行描述(因为它会造成较多的不必要的内容传输),例如服务器的配置描述、Spring的配置描述、Maven仓库描述等等。
  • JSON:JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。它和XML的设计思路是一致的:和语言无关(流行的语言都支持JSON格式描述:Go、Python、C、C++、C#、JAVA、Erlang、JavaScript等等);但是和XML不同,JSON的设计目标就是为了进行通信。要描述同样的数据,JSON格式的容量会更小。
  • protocol buffer(PB):protocolbuffer(以下简称PB)是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了三种语言的实现:java、c++ 和 python,每一种实现都包含了相应语言的编译器以及库文件。
  • TLV(三元组编码):T(标记/类型域)L(长度/大小域)V(值/内容域),通常这种信息格式用于金融、军事领域。它通过字节的位运算来进行信息的序列化/反序列化
  • 自定义的格式:当然,如果您的两个内部系统已经约定好了一种信息格式,您当然可以使用自己定制的格式进行描述。您可以使用C++描述一个结构体,然后序列化/反序列它,或者使用一个纯文本,以“|”号分割这些字符串,然后序列化/反序列它。

三 网络传输

3.1 网络传输

具体传输过程参考:

网络传输7层

  • 物理层

解决两个硬件之间怎么通信的问题,常见的物理媒介有光纤、电缆、中继器等。它主要定义物理设备标准,如网线的接口类型、光纤的接口类型、各种传输介质的传输速率等。 它的主要作用是传输比特流(就是由1、0转化为电流强弱来进行传输,到达目的地后在转化为1、0,也就是我们常说的数模转换与模数转换)。这一层的数据叫做比特。

  • 数据链路层:

在计算机网络中由于各种干扰的存在,物理链路是不可靠的。该层的主要功能就是:通过各种控制协议,将有差错的物理信道变为无差错的、能可靠传输数据帧的数据链路。 它的具体工作是接收来自物理层的位流形式的数据,并封装成帧,传送到上一层;同样,也将来自上层的数据帧,拆装为位流形式的数据转发到物理层。这一层的数据叫做帧。

  • 网络层:

计算机网络中如果有多台计算机,怎么找到要发的那台?如果中间有多个节点,怎么选择路径?这就是路由要做的事。 该层的主要任务就是:通过路由选择算法,为报文(该层的数据单位,由上一层数据打包而来)通过通信子网选择最适当的路径。这一层定义的是IP地址,通过IP地址寻址,所以产生了IP协议。

  • 传输层:

当发送大量数据时,很可能会出现丢包的情况,另一台电脑要告诉是否完整接收到全部的包。如果缺了,就告诉丢了哪些包,然后再发一次,直至全部接收为止。 简单来说,传输层的主要功能就是:监控数据传输服务的质量,保证报文的正确传输。

  • 会话层:

虽然已经可以实现给正确的计算机,发送正确的封装过后的信息了。但我们总不可能每次都要调用传输层协议去打包,然后再调用IP协议去找路由,所以我们要建立一个自动收发包,自动寻址的功能。于是会话层出现了:它的作用就是建立和管理应用程序之间的通信。

  • 表示层:

表示层负责数据格式的转换,将应用处理的信息转换为适合网络传输的格式,或者将来自下一层的数据转换为上层能处理的格式。

  • 应用层:

应用层是计算机用户,以及各种应用程序和网络之间的接口,其功能是直接向用户提供服务,完成用户希望在网络上完成的各种工作。前端同学对应用层肯定是最熟悉的。

四 BIO

  • 上方我们以线下问诊的方式看到,一位医生只能处理一个患者信息,一位患者进了门,其他患者就只能门外等待(Wait), 依次处理
  • 但是我们能不能转变一种方式,线上问诊资讯,一个医生同时为为多个患者提供病情咨询,无须等待一个患者的回复,就可以与其他患者交流

从程序的角度来看,这就我们接下来要讲的BIO与NIO

4.1 BIO 详解

Blocking IO。每个客户端的Socket连接请求,服务端都会对应有个处理线程与之对应,对于没有分配到处理线程的连接就会被阻塞或者拒绝。相当于是一个连接一个线程。 传统的BIO通信方式存在几个问题:

  • 同一时间,服务器只能接受来自于客户端A的请求信息;虽然客户端A和客户端B的请求是同时进行的,但客户端B发送的请求信息只能等到服务器接受完A的请求数据后,才能被接受。
  • 由于服务器一次只能处理一个客户端请求,当处理完成并返回后(或者异常时),才能进行第二次请求的处理。很显然,这样的处理方式在高并发的情况下,是不能采用的。

上面说的情况是服务器只有一个线程的情况,那么读者会直接提出我们可以使用多线程技术来解决这个问题:

  • 当服务器收到客户端X的请求后,(读取到所有请求数据后)将这个请求送入一个独立线程进行处理,然后主线程继续接受客户端Y的请求。
  • 客户端一侧,也可以使用一个子线程和服务器端进行通信。这样客户端主线程的其他工作就不受影响了,当服务器端有响应信息的时候再由这个子线程通过 监听模式/观察模式(等其他设计模式)通知主线程

但是使用线程来解决这个问题实际上是有局限性的:

  • 虽然在服务器端,请求的处理交给了一个独立线程进行,但是操作系统通知accept()的方式还是单个的。也就是,实际上是服务器接收到数据报文后的“业务处理过程”可以多线程,但是数据报文的接受还是需要一个一个的来(下文的示例代码和debug过程我们可以明确看到这一点)
  • 在linux系统中,可以创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max 命令查看可以创建的最大线程数。当然这个值是可以更改的,但是线程越多,CPU切换所需的时间也就越长,用来处理真正业务的需求也就越少。
  • 创建一个线程是有较大的资源消耗的。JVM创建一个线程的时候,即使这个线程不做任何的工作,JVM都会分配一个堆栈空间。这个空间的大小默认为128K,您可以通过-Xss参数进行调整。
  • 当然您还可以使用ThreadPoolExecutor线程池来缓解线程的创建问题,但是又会造成BlockingQueue积压任务的持续增加,同样消耗了大量资源。
  • 另外,如果您的应用程序大量使用长连接的话,线程是不会关闭的。这样系统资源的消耗更容易失控。

那么,如果你真想单纯使用线程解决阻塞的问题,那么您自己都可以算出来您一个服务器节点可以一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。

编码模拟真实场景, 多个客户端请求服务端

  • 客户端
package com.shu.thread;

import java.util.concurrent.CountDownLatch;

/**
* @description: 客户端守护线程
* @author: shu
* @createDate: 2023/12/25 18:16
* @version: 1.0
*/
public class SocketClientDaemon {
public static void main(String[] args) throws InterruptedException {
// 线程个数
Integer clientNumber = 20;
// 计数器
CountDownLatch countDownLatch = new CountDownLatch(clientNumber);
//分别开始启动这20个客户端
for(int index = 0 ; index < clientNumber ; index++ , countDownLatch.countDown()) {
SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
new Thread(client).start();
}
//这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态
synchronized (SocketClientDaemon.class) {
SocketClientDaemon.class.wait();
}
}
}

package com.shu.thread;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;

/**
* @description: 客户端守护线程
* @author: shu
* @createDate: 2023/12/25 18:17
* @version: 1.0
*/
public class SocketClientRequestThread implements Runnable {

/**
* 日志
*/
private static final Logger LOGGER = Logger.getLogger(SocketClientRequestThread.class);


/**
* CountDownLatch是一个同步计数器,它可以使一个或多个线程等待一组事件发生后再执行。
*/
private CountDownLatch countDownLatch;

/**
* 这个线层的编号
* @param countDownLatch
*/
private Integer clientIndex;

/**
* countDownLatch是java提供的同步计数器。
* 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性
* @param countDownLatch
*/
public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) {
this.countDownLatch = countDownLatch;
this.clientIndex = clientIndex;
}

@Override
public void run() {
Socket socket = null;
OutputStream clientRequest = null;
InputStream clientResponse = null;
try {
socket = new Socket("10.255.13.78",83);
clientRequest = socket.getOutputStream();
clientResponse = socket.getInputStream();
//等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求
this.countDownLatch.await();
//发送请求信息
clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。").getBytes());
clientRequest.flush();
//在这里等待,直到服务器返回信息
SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息");
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
int realLen;
String message = "";
//程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了)
while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
message += new String(contextBytes , 0 , realLen);
}
SocketClientRequestThread.LOGGER.info("接收到来自服务器的信息:" + message);
} catch (Exception e) {
SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
} finally {
try {
if(clientRequest != null) {
clientRequest.close();
}
if(clientResponse != null) {
clientResponse.close();
}
} catch (IOException e) {
SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
}
}
}
}

  • 服务端
package com.shu.thread;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @description:
* @author: shu
* @createDate: 2023/12/25 18:22
* @version: 1.0
*/
public class SocketServer1 {
private static final Logger LOGGER = Logger.getLogger(SocketServer1.class);

public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(83);
LOGGER.info("服务器启动成功");
try {
while (true) {
Socket socket = serverSocket.accept();
//下面我们收取信息
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 2048;
byte[] contextBytes = new byte[maxLen];
//这里也会被阻塞,直到有数据准备好
int realLen = in.read(contextBytes, 0, maxLen);
//读取信息
String message = new String(contextBytes, 0, realLen);
//下面打印信息
LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);
//下面开始发送信息
out.write("回发响应信息!".getBytes());
//关闭
out.close();
in.close();
socket.close();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
}

这种方式的缺点,服务端只能一条一条的接受,并处理客户端的消息

  • 多线程服务端:当连接建立后,逻辑处理交给其他线程
package com.shu.thread;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @description: 多线程版处理客户端消息:这种方式的优点是,当有多个客户端连接时,每个客户端都会被分配一个线程来处理
* @author: shu
* @createDate: 2023/12/25 18:56
* @version: 1.0
*/
public class SocketServer2 {

private static final Logger LOGGER = Logger.getLogger(SocketServer2.class);

public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(83);
try {
while (true) {
Socket socket = serverSocket.accept();
//当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
//最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
SocketServerThread socketServerThread = new SocketServerThread(socket);
new Thread(socketServerThread).start();
}
} catch (Exception e) {
SocketServer2.LOGGER.error(e.getMessage(), e);
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}

}

package com.shu.thread;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
* @description: 服务器线程处理类,负责处理Socket链路中的数据读写操作
* @author: shu
* @createDate: 2023/12/25 18:54
* @version: 1.0
*/
public class SocketServerThread implements Runnable {
private static final Logger LOGGER = Logger.getLogger(SocketServerThread.class);

private Socket socket;

public SocketServerThread (Socket socket) {
this.socket = socket;
}

/**
* 业务逻辑
*/
@Override
public void run() {
InputStream in = null;
OutputStream out = null;
try {
//下面我们收取信息
in = socket.getInputStream();
out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
//也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
int realLen = in.read(contextBytes, 0, maxLen);
//读取信息
String message = new String(contextBytes , 0 , realLen);
//下面打印信息
SocketServerThread.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);
//下面开始发送信息
out.write("回发响应信息!".getBytes());
} catch(Exception e) {
SocketServerThread.LOGGER.error(e.getMessage(), e);
} finally {
//试图关闭
try {
if(in != null) {
in.close();
}
if(out != null) {
out.close();
}
if(this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
SocketServerThread.LOGGER.error(e.getMessage(), e);
}
}
}
}

这种方式并没有从根本上解决阻塞的问题,那么重点的问题并不是“是否使用了多线程”,而是为什么accept()、read()方法会被阻塞。即:异步IO模式 就是为了解决这样的并发性存在的。但是为了说清楚异步IO模式,在介绍IO模式的时候,我们就要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。 记住:只要有IO的存在,就会有阻塞或非阻塞的问题,无论这个IO是网络的,还是硬盘的。这就是为什么基本的JAVA NIO框架中会有FileChannel(而且FileChannel在操作系统级别是不支持非阻塞模式的)、DatagramChannel和SocketChannel的原因。

为啥serverSocket.accept这个方法会被阻塞?

  • 服务器线程发起一个accept动作,询问操作系统 是否有新的socket套接字信息从端口X发送过来。

  • 注意,是询问操作系统。也就是说socket套接字的IO模式支持是基于操作系统的,那么自然同步IO/异步IO的支持就是需要操作系统级别的了。如下图:

  • 如果操作系统没有发现有套接字从指定的端口X来,那么操作系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为什么accept()方法为什么会阻塞:它内部的实现是使用的操作系统级别的同步IO
  • 阻塞IO 和 非阻塞IO 这两个概念是程序级别的。主要描述的是程序请求操作系统IO操作后,如果IO资源没有准备好,那么程序该如何处理的问题:前者等待;后者继续执行(并且使用线程一直轮询,直到有IO资源准备好了)
  • 同步IO 和 非同步IO,这两个概念是操作系统级别的。主要描述的是操作系统在收到程序请求IO操作后,如果IO资源没有准备好,该如何相应程序的问题:前者不响应,直到IO资源准备好以后;后者返回一个标记(好让程序和自己知道以后的数据往哪里通知),当IO资源准备好以后,再用事件机制返回给程序。

#五 通信框架

5.1 常见的通信框架

目前流行的NIO框架非常的多。在论坛上、互联网上大家讨论和使用最多的有以下几种: ● 原生JAVA NIO框架:JAVA NIO通信框架基于多路复用IO原理,我们将详细讲解它的工作原理。 ● APACHE MINA 2:是一个网络应用程序框架,用来帮助用户简单地开发高性能和高可扩展性的网络应用程序。它提供了一个通过Java NIO在不同的传输例如TCP/IP和UDP/IP上抽象的事件驱动的异步API。 ● NETTY 4/5:Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。我们将讲解NETTY 4 的工作原理。另外说一句:MANA和NETTY的主要作者是同一人Trustin Lee。(后面学习) ● Grizzly:Grizzly是一种应用程序框架,专门解决编写成千上万用户访问服务器时候产生的各种问题。使用JAVA NIO作为基础,并隐藏其编程的复杂性。