Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
mynote
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ClassmateWang
mynote
Commits
20af8a99
Commit
20af8a99
authored
Nov 01, 2021
by
ClassmateWang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
2021-11-1 12:33 更新\Netty\黑马\1)NIO.md
parent
416890e8
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
334 additions
and
1 deletions
+334
-1
Netty/黑马/1)NIO.md
+334
-1
No files found.
Netty/黑马/1)NIO.md
View file @
20af8a99
# 1
)NIO
# 1
)NIO
...
...
@@ -805,6 +805,8 @@ for (SocketChannel channel : list){
![
image-20211101122037522
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101122037522.png
)
### 4.2多路复用器selector
> 多路复用器的核心功能在于选择已经就绪的任务
...
...
@@ -817,7 +819,17 @@ Selector 不断的轮询注册在其上面的channel ,如果某个channel 上
>
> //todo: 什么是poll?什么是epoll?
#### 监听channel 的事件
通过下面的三种方法来监听是否有事件发生,方法的返回值代表有多少channel 发生了事件:
-
int count = selector.select();
-
阻塞直到绑定的事件发生
-
!
[
image-20211101122513989
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101122513989.png
)
-
int count = selector.select(long timeout);
-
阻塞直到绑定的事件发生或者超时
-
int count = selector.selectNow();
-
不会阻塞,也就是不管有没有事件,立即返回,根据返回值检查是否有事件
**<u>事件的类型:</u>**
...
...
@@ -983,6 +995,327 @@ ServerSocketChannel 建立之后被注册到了selector 中,从而获取到了
#### 处理消息边界
可能出现的情况:
![
image-20211101102417046
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101102417046.png
)
**<u>解决方案:</u>**
-
客户端和服务端约定Buffer长度
-
如果message 长度不够要补齐
-
会造成空间的浪费和网路资源的浪费
-
!
[
image-20211101102540796
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101102540796.png
)
-
用的很少
-
客户端发消息时使用分隔符分割消息
-
服务器预先创建一个ByteBuffer,用这个ByteBuffer 去读数据,遇到分隔符则创建一个新的ByteBuffer 去接收,然后再用ByteBuffer 去接收下一个新的数据
-
如果消息大于既定的ByteBuffer 还是需要考虑扩容的问题
-
效率不高,在传输的过程中需要一个字节一个字节的去对比,直到得到
\n
-
!
[
image-20211101102949249
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101102949249.png
)
-
用的少
-
将消息分为两部分:数据内容长度,数据内容,先发送内容长度,再接收内容
-
TLV格式,Type类型,Length大小,Value数据,http1.1的处理方式
-
LTV http2.0的处理方式
-
根据消息的长度创建对应大小的 ByteBuffer 来接收内容,动态操作
-
!
[
image-20211101103210425
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101103210425.png
)
-
因为需要提前创建ByteBuffer 准备内存资源(应用程序在运行的过程中分配资源是非常浪费时间影响效率的),当ByteBuffer 过大的时候会影响数据的效率
-
Http协议的处理方式
`通过方式二进行处理:使用分隔符处理边界`
当消息的长度小于我们预先设置的ByteBuffer 的长度时:
比如说ByteBuffer.length = 16;message = "hello
\n
world
\n
";
可以通过:
```
java
private
static
void
split
(
ByteBuffer
source
)
{
source
.
flip
();
for
(
int
i
=
0
;
i
<
source
.
limit
()
;
i
++){
/*找到一条完整的消息*/
if
(
source
.
get
(
i
)
==
'\n'
){
/*这条完整消息的长度*/
int
length
=
i
+
1
-
source
.
position
();
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
length
);
for
(
int
j
=
0
;
j
<
length
;
j
++){
buffer
.
put
(
source
.
get
());
}
}
}
source
.
compact
();
}
```
获取到消息的长度
但是如果单条消息的长度就大于ByteBuffer 的长度:
message = "0123456789abcdef123456
\n
";
![
image-20211101104833161
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101104833161.png
)
这个时候观察代码在
\n
前的"0123456789abcdef" 因为没有
\n
会被丢失掉
最终读出的数据只能是"123456
\n
"
因为这里的ByteBuffer 是局部变量,意味着在两次读事件发生的时候用到的是不同的ByteBuffer,即进入split 的是两个不同的ByteBuffer,而正常来说,应该使用同一个ByteBuffer 这样才能保证数据不被丢失
**<u>解决方法:</u>**
!
[
](https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101105127372.png)
-
扩容通过将ByteBuffer 翻倍即可
-
需要注意的是每一个SokcetChannel 都应该有一个独属于它的ByteBuffer 这样才不会乱
-
目前在代码中实现的时候会发现Buffer 无法在两次read 事件中共用,所以可以selector 提供了附件的解决思想
#### 附件attachment
-
如何保证每个Channel 每次读事件使用的都是他们独有的Buffer
-
在什么时候进行Buffer 的扩容
```
java
/*第三个参数就是附件*/
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
16
);
Selection
key
=
socketChannel
.
register
(
selector
,
0
,
buffer
);
```
在split 方法中:
```
java
source
.
compact
();
//让position = 没有读完的剩余字节数 对于没有读到\n的情况,position == limit =16
```
完整代码:
```
java
while
(
iterator
.
hasNext
()){
SelectionKey
key
=
iterator
.
next
();
/*处理key时要从selectKeys中删除,否则下次处理就会有问题*/
/**
* 从底层集合中移除此迭代器返回的最后一个元素(可选操作)。 每次调用next只能调用此方法一次
*/
iterator
.
remove
();
/*5、区分事件类型*/
if
(
key
.
isAcceptable
()){
ServerSocketChannel
channel
=
(
ServerSocketChannel
)
key
.
channel
();
SocketChannel
socketChannel
=
channel
.
accept
();
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
16
);
/*将ByteBuffer 关联到selectionKey 上*/
SelectionKey
scKey
=
socketChannel
.
register
(
selector
,
0
,
buffer
);
scKey
.
interestOps
(
SelectionKey
.
OP_READ
);
}
else
if
(
key
.
isReadable
()){
try
{
SocketChannel
socketChannel
=
(
SocketChannel
)
key
.
channel
();
/*获取附件*/
ByteBuffer
buffer
=
(
ByteBuffer
)
key
.
attachment
();
int
read
=
socketChannel
.
read
(
buffer
);
buffer
.
flip
();
if
(
read
==-
1
){
key
.
cancel
();
}
else
{
split
(
buffer
);
}
if
(
buffer
.
position
()
==
buffer
.
limit
()){
/*意味着Buffer 满了,且没有读到\n 这个时候就需要扩容*/
ByteBuffer
newBuffer
=
ByteBuffer
.
allocate
(
buffer
.
capacity
()
*
2
);
/*将旧的ByteBuffer 的内容拷贝到新的ByteBuffer中*/
buffer
.
flip
();
newBuffer
.
put
(
buffer
);
key
.
attach
(
newBuffer
);
}
}
catch
(
IOException
e
){
e
.
printStackTrace
();
key
.
cancel
();
}
}
}
```
```
java
private
static
void
split
(
ByteBuffer
source
)
{
source
.
flip
();
for
(
int
i
=
0
;
i
<
source
.
limit
()
;
i
++){
/*找到一条完整的消息*/
if
(
source
.
get
(
i
)
==
'\n'
){
/*这条完整消息的长度*/
int
length
=
i
+
1
-
source
.
position
();
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
length
);
for
(
int
j
=
0
;
j
<
length
;
j
++){
buffer
.
put
(
source
.
get
());
}
}
}
source
.
compact
();
//让position = 没有读完的剩余字节数 对于没有读到\n的情况,position == limit =16
}
```
> 这样做其实也是有一些问题的,因为ByteBuffer 这样下去就会不断的变大,并不会缩小,而Netty 的实现方式不仅有扩容,当数据变小时还会对buffer 进行自适应的调整
#### ByteBuffer大小分配
![
image-20211101111628636
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101111628636.png
)
-
思路一:拷贝法
-
Buffer 小就扩一个大的
-
思路二:组合法
-
多个Buffer 或者多个数组链表组成大的ByteBuffer
#### write事件处理
`WriteServer`
```
java
package
netty
.
t3
;
import
lombok.SneakyThrows
;
import
java.net.InetSocketAddress
;
import
java.nio.ByteBuffer
;
import
java.nio.channels.SelectionKey
;
import
java.nio.channels.Selector
;
import
java.nio.channels.ServerSocketChannel
;
import
java.nio.channels.SocketChannel
;
import
java.nio.charset.Charset
;
import
java.util.Iterator
;
/**
* @BelongsProject: JavaLearnAll
* @BelongsPackage: netty.t3
* @Author: Wang Haipeng
* @CreateTime: 2021-11-01 11:34
* @Description: 写数据服务器
*/
public
class
WriteServer
{
@SneakyThrows
public
static
void
main
(
String
[]
args
)
{
Selector
selector
=
Selector
.
open
();
ServerSocketChannel
ssc
=
ServerSocketChannel
.
open
();
ssc
.
configureBlocking
(
false
);
ssc
.
bind
(
new
InetSocketAddress
(
"localhost"
,
8081
));
ssc
.
register
(
selector
,
SelectionKey
.
OP_ACCEPT
,
ByteBuffer
.
allocate
(
16
));
while
(
true
){
selector
.
select
();
Iterator
<
SelectionKey
>
iterator
=
selector
.
selectedKeys
().
iterator
();
while
(
iterator
.
hasNext
()){
SelectionKey
key
=
iterator
.
next
();
iterator
.
remove
();
if
(
key
.
isAcceptable
()){
SocketChannel
sc
=
ssc
.
accept
();
sc
.
configureBlocking
(
false
);
StringBuilder
stringBuilder
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
30000000
;
i
++){
stringBuilder
.
append
(
"A"
);
}
ByteBuffer
buffer
=
Charset
.
defaultCharset
().
encode
(
stringBuilder
.
toString
());
/*网络的能力有限所以需要查看buffer 是否写完然后不停的写入channel*/
while
(
buffer
.
hasRemaining
()){
int
write
=
sc
.
write
(
buffer
);
System
.
out
.
println
(
write
);
}
}
}
}
}
}
```
`WriteClient`
```
java
package
netty
.
t3
;
import
java.io.IOException
;
import
java.net.InetSocketAddress
;
import
java.nio.ByteBuffer
;
import
java.nio.channels.SocketChannel
;
/**
* @BelongsProject: JavaLearnAll
* @BelongsPackage: netty.t3
* @Author: Wang Haipeng
* @CreateTime: 2021-11-01 11:42
* @Description: 写客户端
*/
public
class
WriteClient
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
SocketChannel
sc
=
SocketChannel
.
open
();
sc
.
connect
(
new
InetSocketAddress
(
"127.0.0.1"
,
8081
));
int
count
=
0
;
while
(
true
){
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
1024
*
1024
);
count
+=
sc
.
read
(
buffer
);
System
.
out
.
println
(
count
);
buffer
.
clear
();
}
}
}
```
![
ServerConsel
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101114604923.png
)
![
ClientConsel
](
https://wangnotes.oss-cn-beijing.aliyuncs.com/notesimage/image-20211101114650518.png
)
从打印输出的效果图可以看到数据会成功的发送和成功的接收,但是因为网络对一次性发送的数据的承载能力有限(由底层操作系统进行控制),所以一个ByteBuffer 中的数据不会一次性的发送过去(因为ByteBuffer 这里用的是堆内存,而不是操作系统的直接内存),而是不断的去尝试能不能发,这样显然是不符合一个非阻塞的思想的,单线程在无法发送的时候完全可以去处理一些其他的事情而不是在这里去不断的尝试浪费资源
最好的情况就是:
`当操作系统标识现在可以写数据了,应用单线程再去处理这个写事件,将每次的写事件粒子化,或者说再次事件化`
解决方案:
```
java
if
(
key
.
isAcceptable
()){
SocketChannel
sc
=
ssc
.
accept
();
sc
.
configureBlocking
(
false
);
StringBuilder
stringBuilder
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
30000000
;
i
++){
stringBuilder
.
append
(
"A"
);
}
ByteBuffer
buffer
=
Charset
.
defaultCharset
().
encode
(
stringBuilder
.
toString
());
int
write
=
sc
.
write
(
buffer
);
System
.
out
.
println
(
write
);
/*修改点一:*/
if
(
buffer
.
hasRemaining
()){
/*让当前的socket 再关注一个可写事件*/
key
.
interestOps
(
SelectionKey
.
OP_WRITE
+
key
.
interestOps
());
/*将没写完的数据挂到key上*/
key
.
attach
(
buffer
);
}
}
/*增加点一:*/
else
if
(
key
.
isWritable
()){
/*因为这个key 已经关注了写事件,
即使这次写事件仍然没写完
当下一次再触发写事件时任然会再进入这个else if*/
ByteBuffer
buffer
=
(
ByteBuffer
)
key
.
attachment
();
SocketChannel
sc
=
(
SocketChannel
)
key
.
channel
();
int
count
=
sc
.
write
(
buffer
);
System
.
out
.
println
(
count
);
/*清理操作*/
if
(!
buffer
.
hasRemaining
()){
/*将缓冲区从内存移除*/
key
.
attach
(
null
);
/*不需要再关注可写事件*/
key
.
interestOps
(
key
.
interestOps
()-
SelectionKey
.
OP_WRITE
);
}
}
```
### 4.3NIO的server
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment