Commit 224f0c00 by ClassmateWang

2021-11-08 13:47 更新\Netty\黑马

parent 20af8a99
# 2)NIO-多线程
> 现在CPU都是多线程的,不应该让CPU的资源白白浪费掉
>
> ![image-20211101150537124](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101150537124.png)
>
> 创建一个BOSS线程只负责处理连接,而其他的worker 线程去处理读事件或者写事件,但是并不是说像非阻塞下的多线程模式下,不停的去创建线程,而是让线程的数量尽可能与CPU核的数量是一致的,并且多个channel 的读事件或者写事件其实也可以交由worker 中的selector 去处理
## 一、Boss-Worker 模型
![image-20211102105739815](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102105739815.png)
存在一个问题就是在主线程和worker 线程的register中都会共用一个selector,而这个selector 因为在worker线程中先执行就会导致register 阻塞,必须等到selector 不被阻塞的情况下,才能register
这样不会报错但是注意这个问题。
也就是`多线程中注册key必须在select 方法前进行`
![image-20211102110141120](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102110141120.png)
必须先执行register 方法,然后再执行select 方法
<u>**解决方法一**</u>
- 通过消息队列在线程间进行任务的传输
```java
package netty.t4;
import lombok.SneakyThrows;
import netty.t1.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @BelongsProject: JavaLearnAll
* @BelongsPackage: netty.t4
* @Author: Wang Haipeng
* @CreateTime: 2021-11-01 15:08
* @Description: 多线程服务器
*/
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.bind(new InetSocketAddress("localhost", 8081));
ssc.register(boss, SelectionKey.OP_ACCEPT, ByteBuffer.allocate(16));
/*1、创建固定数量的worker*/
Worker readWorker = new Worker("worker-read");
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
/*在boss线程中由ServerSocketChannel 触发*/
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
readWorker.register(sc);
}
}
}
}
}
class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
/**
* 这里要表示的是,对于整个项目来说是所有的channel 读事件由一个线程去处理
* 而不是一个channel 用一个线程去处理
*/
private boolean start = false; // 还未初始化
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws IOException {
if(!start){
/*初始化Thread*/
this.thread = new Thread(this,name);
this.selector = Selector.open();
this.thread.start();
this.start = true;
}
/*向队列中添加一个任务,但是这个任务并没有在boss中立即执行*/
queue.add(()->{
//2、将socketChannel关联到selector
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
/*此时select 方法因为没有事件发生被阻塞掉,
所以register 方法也无法调用发生,
此时这里处于boss线程中,相当于要给worker 线程发消息,
所以这里要唤醒worker 线程*/
selector.wakeup();
}
/**
* 这个Worker 只去关注读事件
*/
@SneakyThrows
@Override
public void run() {
while (true){
selector.select();
Runnable task = queue.poll();
if (task != null){
/*相当于在这里完成了key 的注册*/
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
}
}
public Selector getSelector() {
return selector;
}
}
```
**<u>方法二:(简化)</u>**
- 直接在boss 线程注册key 的时候唤醒selector 完成register的注册
![image-20211102112530058](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102112530058.png)
## 二、多worker模型
创建多个worker,当线程一来的时候让他找worker[0],当线程二来的时候让他找worker[1],实现轮询的效果,这是一种负载均衡的实现方式。
- 这里worker 尽量设置成为cpu 的核数
- 动态获取CPU的核心数
- ```java
/*动态设置线程数为CPU核心数*/
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
```
- ![image-20211102114228376](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102114228376.png)
- 如果IO用的多一些,CPU用的少一些,可以查看阿姆达尔定律计算出合适的数量
```java
/*1、创建固定数量的worker*/
Worker[] workers = new Worker[2];
for (int i = 0 ; i < workers.length; i++){
workers[i] = new Worker("worker-"+i);
}
/*2、创建自增的index*/
AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
/*在boss线程中由ServerSocketChannel 触发*/
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
/*3、每次自增完模数组长度得到的余数就只有0,1两种情况,就能将channel均匀的分到不同的线程上*/
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
}
```
# 3)概念刨析
# 3)概念刨析
## 一、Stream和channel
- Stream **不会自动缓冲数据**,channel 会利用系统提供的发送缓冲区,接收缓冲区(**更为底层**
- Stream 仅支持阻塞API,channel **同时支持阻塞,非阻塞API**,网络channel可配合selector 实现多路复用
- 二者均为**全双工**,即**读写可以同时进行**
## 二、网络编程IO模型
> 参考《UNIX 网络编程-卷一》
- 阻塞IO
- 从网络读取数据涉及到从用户空间转向Linux 内核空间的一个过程
- 用户线程被阻塞,在数据读取的期间线程无法继续向下运行,必须得等到有了数据才能继续向下运行
- ![image-20211102120313509](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102120313509.png)
- 非阻塞IO
- 不断的调read 方法检测有没有数据,没有数据就返回一个0,用户线程并没有停下
- ![image-20211102120538122](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102120538122.png)
- 复制数据的时候用户线程仍然会被阻塞掉,直到拿到数据,这种非阻塞其实是在等待数据阶段非阻塞,在复制数据阶段还是阻塞的,但是时间上这个复制数据的阻塞时间几乎为无
- 其实并不一定比Block 要好,因为涉及多次用户空间和内核空间的切换还是涉及到很多资源的浪费
- 多路复用
- 一次性的处理多个IO上的事件
- ![image-20211102121113284](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102121113284.png)
- 信号驱动
- 异步IO
- 同步:线程自己去获取结果(一个线程),上面四个都是同步的
- 异步:线程自己不去获取结果,由其他线程去送结果(至少两个线程)
- ![image-20211102121553978](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211102121553978.png)
- 用户线程直接通知操作系统要read,不去管是不是真的读到数据,是不是有数据,此时方法直接返回,等客户端送来了数据,读取完用一个线程送回来
- 异步是非阻塞的,不存在异步阻塞的问题,没有这种模型和情况
从网络读取数据是通过操作系统完成的,
## 三、零拷贝
> 需求:假设现在要从磁盘中读取一个文件然后通过socket api 进行传输
代码实现:
```java
File file = new File("hello/word.txt");
RandomAccessFile ranFile = new RandomAccessFile(file,"r");
byte[] bytes = new byte[(int)file.length];
file.read(buf);
Socket socket = ....;
socket.getOutputSttream().write(buf);
```
![image-20211104090350629](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104090350629.png)
1. java 本身是<u>不具备IO读写能力</u>的,因此read 方法调用后,会从java 程序的用户态切换至核心态,去调用操作系统的读能力,首先从磁盘中读入数据到内核缓冲区,这期间用户线程会阻塞,操作系统通过DMA(Direct Memory Access)来实现文件读,期间并不会使用CPU,然后会将数据复制到用户缓冲区,从内核态切换为用户态,然后需要写socket 的时候,再从用户态切换为内核态进行写socket,将数据从用户缓冲区复制到socket缓冲区,然后再从socket 缓冲区将数据写到网卡,这个过程同样不会使用CPU
> DMA 是用于解放CPU进行IO操作的
这个过程中进行了三次状态切换,数据拷贝了4次,这个过程非常的繁杂消耗的资源也非常多。
2. NIO 优化
- 通过DirectByteBuffer
- ByteBuffer.allocate(10) return HeapByteBuffer 使用的是Java 内存
- ByteBuffer.allocateDirect(10) return DirectByteBuffer 使用的是操作系统内存,不仅操作系统java 和操作系统都可以访问这块内存
- ![image-20211104091558196](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104091558196.png)
- java 可以使用DirectByteBuffer 将堆内存映射到jvm 内存中来直接访问使用,这样就能变相的减少一次数据的拷贝,但状态转换并没有减少
- 这块内存不受jvm 垃圾回收的影响,因此内存固定,有助于IO的读写
- java 中的DirectByteBuffer 对象仅维护了此内存的虚引用,内存回收分为两步
- DirectByteBuffer 被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用访问堆外内存
**<u>进一步优化:</u>**
![image-20211104092414742](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104092414742.png)
- 只发生了一次用户态和内核态的切换
- 数据拷贝了3次
![image-20211104092539403](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104092539403.png)
- 直接从内核缓冲区发送到网卡
- 进行了两次数据拷贝,一次状态转换,**所谓零拷贝并不是真正的没有数据拷贝的过程,而是不会将数据重复拷贝到jvm 内存中**
零拷贝的优点:
- 状态切换变少了
- 通过DMA完成IO,而不会占用cpu,可以解放CPU
- 适合小文件的传输
## 四、AIO
![image-20211104093009474](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104093009474.png)
![image-20211104093042781](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104093042781.png)
异步IO在linux 下是伪异步,真正实现NIO比较好的是Windows 下的,linux 下只是在API层面实现了异步IO的编写方式,但是实际上在底层上还是用的多路复用的同步非阻塞模型,并没有实际的提高性能,还提高了程序编写的复杂度。
但是并不是说异步IO没有,异步IO是真正的非阻塞,真正的可以继续向下执行等待运行结果即可
\ No newline at end of file
# 4)Netty-api
# 4)Netty-api
## 一、概述
### 1.1Netty 是什么?
Netty 是一个异步(多线程)的、基于事件驱动(使用多路复用技术)的网络应用框架,用于开发快速可维护,高性能的网络服务器和客户端
### 1.2Netty 的地位
Netty 在Java 网络应用框架的地位就好比:Spring在JavaEE中的地位
这些框架都用了Netty:基本上涉及到网络通信都会用到Netty
![image-20211104094823481](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211104094823481.png)
### 1.3Netty的优势
- Netty 基于NIO实现的
- NIO的基础上自己开发,工作量大,bug也多
- 如果要自己构建需要做的事情:
- 需要自己构建应用层协议
- 需要解决TCP传输问题,比如说黏包半包现象
- NIO自身的Bug 在一些情况下epoll 空轮询会导致CPU 100%
- 对NIO的API进行增强,更为易用也更加强大
- 其他网络框架
- Netty 的版本迭代速度更快,API更简洁,文档更优秀,兼容性更好
- 发展了很多年4.x 2013
- 5.x 被废弃了,因为引入AIO失败,性能没有提升,维护成本高
选择框架的考虑:
- API兼容性好
- 没有Bug
- 开发快速
## 二、Hello World
### 2.1 目标需求
开发一个简单的服务器端和客户端
- 客户端向服务器发送HelloWorld
- 服务器仅接收不返回,实现单向通信
导入依赖:
```xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
```
`服务器`
```java
package netty.t5;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
/**
* @BelongsProject: JavaLearnAll
* @BelongsPackage: netty.t5
* @Author: Wang Haipeng
* @CreateTime: 2021-11-04 10:25
* @Description: 服务器
*/
public class HelloServer {
public static void main(String[] args) {
/*1、启动器,负责组装Netty 组件,启动服务器*/
new ServerBootstrap()
//2、BossEventLoop WorkerEventLoop(selector,thread) group包含selector和主副线程
.group(new NioEventLoopGroup())
/*3、选择服务器的ServerSocket实现,NIO,OIO(BIO),epoll*/
.channel(NioServerSocketChannel.class) //accept netty内部进行了处理,如果有accrpt会调用initChannel方法
/*4、child= worker 处理事件的时候需要进行分工 负责处理读写,决定了worker(child)能做些什么*/
.childHandler(
/*5、和客户端连接后进行数据读写的通道,Initializer 初始化,负责添加别的Handler*/
/*处理器只有当连接建立之后才会执行*/
new ChannelInitializer<NioSocketChannel>() {
@Override
/*建立连接后会调用这个初始化方法*/
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
/*6、添加具体handler*/
/*将ByteBuf 转为字符串*/
nioSocketChannel.pipeline().addLast(new StringDecoder());
/*自定义handler*/
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
/*读事件*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*打印转换好的字符串*/
System.out.println(msg);
}
});
}
/*7、绑定监听端口*/
}).bind(8080);
}
}
```
`客户端`
```java
package netty.t5;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* @BelongsProject: JavaLearnAll
* @BelongsPackage: netty.t5
* @Author: Wang Haipeng
* @CreateTime: 2021-11-04 10:39
* @Description: 客户端
*/
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
/*1、启动器*/
new Bootstrap()
/*2、添加EventLoop*/
.group(new NioEventLoopGroup())
/*3、选择客户端channel 实现*/
.channel(NioSocketChannel.class)
/*4、添加针对channel 的数据处理器*/
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
/*将字符串转换为ByteBuf*/
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
/*5、连接到服务器*/
.connect(new InetSocketAddress("localhost",8080))
/*同步方法(阻塞)直到连接建立才会执行*/
.sync()
/*代表底层客户端和服务端的连接对象*/
.channel()
/*6、向服务器发送数据,凡是收发数据都需要走handler,走到处理器内部*/
.writeAndFlush("hello world");
}
}
```
> 概念建立:
>
> - 在自定义的Handler 中的msg,理解为流动的数据,最开始输入的是**ByteBuf**,但经过**pipline 的加工**,就可以变成其他类型对象,最后输出又输出ByteBuf,handler 就是针对数据的一道道工序,可以对数据进行加工
> - handler 理解为数据的处理工序
> - 工序有多道,合在一起就是pipline,pipline 负责发布事件(读、读取完成....),然后传播给每个handler,handler 只对自己感兴趣的事件进行处理(重写了相应事件的处理方法)
> - handler分为
> - Inbound
> - Outbound
> - pipline 流水线,对数据的处理有的时候一道工序不够有的时候需要多道工序,而这个就像流水线多道工序处理数据
> - 把eventLoop 理解为处理数据的工人
> - 工人可以管理多个Channel 的IO操作,并且一旦一个工人负责了某个channel,就要负责到底
> -
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment