结构化数据不应该被人工智能遗忘

零拷贝,从字面意思理解就是数据不需要来回的拷贝,大大提升了系统的性能。

Netty、Kafka中的零拷贝技术到底有多牛?

图片来自 Pexels

我们也经常在 Java NIO,Netty,Kafka,RocketMQI s c 等框架中听到零拷贝z 3 a ! ^ T ,它经常作为其提升性能的一大亮点;下{ x r / R B x面从 I/O 的几个概念开始,进而再分析零拷j t # - m _贝。

I/O 概念

缓冲区

缓冲区是所有 I/O 的基础,I/O 讲的无非就是把数据移进或移出缓冲区;进程执行 I/O 操作,^ ) = N就是向操作系统发出请求,让它要么把缓冲区的数据排干(写),要么填充缓冲区(读)。

下面看一个 Java 进程发起 Read 请求加载I m G + Q 6 & I 9数据大致的流程图:

Netty、Kafka中的零拷贝技术到底有多牛?

进程发起 Read 请求之后k [ h 4 m 7 h #,内核接收到 Read 请求之后,Z @ o r会先检N d E _ , Q l查内核空间中是否已经存在进程b p k +所需要的数据,如果已经存在,则直接把1 O . i B数据 Copy 给进程的缓冲区。

如果没有内核随即向磁盘控制器发出命令,要求从磁盘读取数据,磁盘控b # . o s r w制器把数据直? P } M x |接写入内核 Read 缓冲区,这一步通过 DMA 完成。

接下来就] ! t是内核将数据 Copy 到进程的缓冲区;如果进程发起 Write 请求,s J z h ; o 1 ; Z同样需要把用户缓冲区里面的数据 Copy 到内核的 Socket 缓冲区里面,然后再通过 DMA 把数据 Copy 到网卡中,发送出去。

你可能觉得这样挺浪E l ; P k Q b m费空间的,每次都需要把内C d n $ e : / x核空间的数据拷贝到用户空间中,所以零拷贝的出现就是为了解决这种问题的。

关于零拷贝提供了两种方式分别是:

  • mmap+wrN G i 3 A 1ite
  • Sendfile

虚拟内存

所有现代操作系l q u C G X 2 U J统都使用虚拟内存,使用虚拟的地址取代物理地址,这样做的好处是:

  • 一个以上的虚拟地址可以指向同一个n I ) s y E物理J n s K & # V内存地址。
  • 虚拟t 7 y d内存空间可大于实际可用的物理地址。

利用第一条特9 3 H N ;性可以把内核空间地址和用户空间的虚拟地址映射到同一个物理地址,这样 DMA 就可以填充对内核和用户空间进程同时可见的缓冲区了。

大致如下图所示:

Netty、Kafka中的零拷贝技术到底有多牛?

省去了内核与用户空间的往来拷贝,, u 8 M |Java 也利用操作系统的此特性来提升性能,下面重点看看 Java 对零拷贝都有哪v D 3 ] I d m些支持。

mmap+write 方式

使用 mmap+write 方式代替原来的 read+write 方式,mmap 是一种内存映射文件的方法,即将一个文件或者其他对象映射到进程的地址空间,实现文件磁盘地% L h址和进程虚拟地址空间中一段虚拟地址的一一对应关系。

这样就可以省掉原来内核 Read 缓冲区 Copy 数据到用户缓冲区,但是还是需要内核 Read 缓冲区将数据 Copy 到内核 Socket 缓冲区。

大致_ A # } i s E R如下图所示:E v ; &

Netty、Kafka中的零拷贝技术到底有多牛?

Sendfile 方式

Sendfile 系统调用在内核版本 2.1 中被引入,目的是简化通过网络在两个通道之a _ K间进行的数据传输过程。

Se7 4 2 W n z T Jndfile 系统调用的引入,不仅减少了数据复制,还减少了上下文切换的次数,大致如下图所示:

Netty、Kafka中的零拷贝技术到底有多牛?

数据传送只发生在内核空间,所以减少了一次上下文切换;但是还是存在一次 Copy,能不能把这一次 Copy 也省略掉?

Linux2.4 内核中J [ 0 4做了改进,将 Kernel buffer 中对应的数据描述信息(内存地址,偏移量)记) y ~ r @ b .录到相应的 Socket 缓冲区当中,这样连内核空间中的一次 CPU Copy 也省掉了。

Java 零拷贝

MappedByteBuffer

Java NIO 提供的 FileChannel 提供了 map() 方法,该方法可以在一个打开的文件和 MappedByteBuffer 之间建立一个虚拟内存映射。

MappedByteBuffer 继承于 ByteBuffer,类似于一个基于内存的缓冲区,只不过该对象的数据元素存储在磁盘的一个文件中。

调用 get() 方法会; y } O ! i T ) q从磁盘中获取数据,此数据反映该文件当前的内容,调用 put() 方法会更新磁盘上的文件,并且对文件做的修改对其他阅读者也是可见的。

下面看一个简单的读取实例,然后再对 MapN 8 YpedByteBuffer 进行分析:

public class&nb& 4 e 6sp;MappedByteBufferTest {   U k r I v ) (  e&nb~ } 4sp;  public static1 ) @ . b )  void main(String[] args) throws Exception {      &nbs_ m 5 R `p;  File file = na U & z Zew File(\"D://db.txt\")V - j l;   &nk q G i F z M D {bsp;     long lr K : 1 1 G #en = file.length();         byte[] ds = new byte[(int) len];         MappedByteBu* e ) i B $ %ffer mappedByteBuffer = new&n- a &bsp;FileInV G 4 + I gputStream(file).getChannel().map(FileChannel.MapMo= 6 V |de.READ_ONLY, # n K # 4 2 _ 8 |0,         ~ t J 7 J A !     &` F N ~ z o fnbsp;&N I }n5 H 4 8 ^ 9 ! 9 *bsp;&nbt  L D $sp;lS W ] 9 1 p ; + 0en);    m 9 J;   &nb* - : } isp;y  ` i N d + for (int offset = 0; offset&nbsk r t { Qp;< len; offset++)&nbO P ~ ( Qsp;{     / B x 4 G;      B W }  byte b = mappedByteBuffer.get();             d7 ~ d 1 - 5s[offset] = b;         }         S7 r ncanner&nbsL m / + 3 V j [ Gp;sc3 Y /an =&no % J Q c ~  # =bsp;new Scanner(new ByteArrayInputStream(ds)).useDelimiter(\" \");   &nbs/ * p @ Ep;     while (scan.hasNext()) {    &_ x = + W A 5 i ^nbsp; &nbsP m t ( i 8 o M *p;      System.out7 E d ` ( r 5 | b.print(scan.next() + \" \"); &f R A $ ynbsp;       }   &nbsb O N q ? p; }&nC 0 c lbsp;} 

主要通过 FileChannel 提供的 map() 来实现映射,map() 方法如下:

public abstract MappedByteBuffer map(MapMode mo# 1 t |de, &nbp [ ! g _ A H +sp;  &nb` e Jsp;   &n7 ` Nbsp;   &t 8 J T } 3 jnbsp;&nb) q h w Ksp;&nbs^ t { g n B Mp;                           lT + Q A e }ong&n| ` [ F w ? mbsp;c x E 6position, long size)   &nA 8 d . Z V .bsp; N + ( ; 6 O @ H );  ~ B u m q 6 +  throws IOException; 

分别提供了三个参数,MapMode,Position 和 Size,分别表示:

  • MapMode:映射的模式,可选项包括:READ_ONLY,READ_K 6 Z -WRITE,PRIVATE。
  • Position:从哪个位置开始映射,字节数的位置。
  • Size:从 Position 开始向后多少个字节。

重点看一下 MapMode,前两个分别表示只读和可读可写,当然请求的映射模式受到 Filechannel 对象的访问权限限制,m D G w .如果在一个没有读权限的文件上启用 READ_ONLY,将抛出 NonReadt D 5 V M 3ableChannelException。

PRIVATE 模式表示写时拷贝的映射,意味着通过 put() 方法所做的任v N m何修改都会导= L ~致产生一个私有的数据拷贝并且F & ^ h d Y Y 5该拷贝中的数据只有 MappedByteBuffer 实例可以看到。

该过程不会对底层文件做任何修改,而且一旦j M m } @ z Z缓冲区被施以垃圾收集动作(garbage collected),那些修改都会丢失。

大致浏览一下 map() 方法的源码:

public MappedByteBuffer # { $ l [;map(MapMode mode, lonP 6 v Y N Pg position, long&n/ g U zbsp;size)      &nb1 s ? I k Csp; throws IOException &z ( @ . M h B 4 znbs w @ a A j 2 4p;  {          &nbst C ( l V p; ...省略..a I $ U t.      g p c j O m U w l;      int pa[ z * O JgePo! 6 : N ( V jsition m 2 ) | 8 U k = (int)(position % allocationGranularity);        w ~ P . U (;    long mapPosition = position - pagePositio9 6 f @n;            long mapSize = size + pagePosition;        &nbW X d g lsp;   try {  &nbP e E j Asp;  &n@ 4 { ; o Q ; 3bsp;    &: 3 5 m s G f | ~nbsp;     // If no exception was thrown from map0,&nbsh = _p;the address is valid               &nb* , D $ r Bsp;addr = map0(imode, mapPosits B Z + E U ~ yion, mapSize);            } catch z C ; u P G(OutOfMemoryErrom k - a & [ R Xr x) { E u U 0 2 8               // An OutOfMemoryError may indicate that we\'ve exhausted memory     &nbs` 6 S 5 z b s H np;  { + Y F M I 2 2  &nbs{ ^ i } ,p;   P m W - b t q  /0 o */ s= } * O V g H ~ *o force gc&J @ l ! n Lnbsp;and re-attempt map  &L ( w # [ y x mnbsp;          [ 9 s i k Z I = e   System.gc();     &v [ _ 8 ]nbsp;          try v  h ` z 7 [ y{              R 7 n M &;      Thread.sleep(100);            p o J o S 0 7 u [;    } catch&nb, P B + Osp;(InterruptedException y) {    &nb; S A P   o Wsp;       K 0 w;        Thread.currentThread().interrup& u it();          @ u @ o      }          p  _ ! f ^ i;     D c 3 & l ( .; try&nbs{ R M ] X Dp;{            &I ) P : y 5 m o -nbsp;      T F F i L ( F _ z; addr = map0(imode, ) - # s I Y 0 r;mapPosition, mapSize);      ( Q = Z % & / 1    &M ! $  |nbsp;     } catch&4 . 0 H 0nbsp;(OutOfMemoryError y) E !) { &nbsP W } * @ p Q 4p;&nbs/ $ h |p;             &nY J p 4 ^bsp;&nbsr { m j :p;  // After&0 g p ~nbsp;a second OOME, fail                   3 Y _ , }; throw new IOException(\"Map failed\", y);      h F . ^ L 9 [ f        T P 9 v P M O  }      &n3 ] v X , 9 ) nbsp;     8 G n . 7;}       &nbs? = & [ D 0p;     // On Windowsg K W X M SK } * t  w Aand potentially other platforms, we need&nb& Z Z p  asp;an R # j @ J c;open    &nbp S msp;     } $ 6 n ; j D &nX U ) ] Z y Cbsp;// file descrQ # H _ - , iiptor fB @ * @ L v l c Vor some mapping op* ; G E Ferations.            FileDescriptor mfd; Z A N    &nb] L c T ssp;      try {  &nI 1 z l i .bsp;     &nU Q % : q V V r 4bsp;       mfd = nd.duplicag ~ B  * . [teForMapping(fd); [ 4 a d & % H;&nb7 L N T G ! , B ysp;          } catch (IOException ioe) {   $ k % } 3 b h 0  &nP 3 # j Z ^bsp; &nbs? Q  $p;  &nh m e 0 i Q 3bsp;&M ; r G 1nbsp; &n W { 6 J pbsp;  unmap0(ad3 q S G X |dr, mapSize);      K Q N m H 5;          throw ioe; &n: 4 + g M J ebsp; k } S % ~ 6 F 1 ); &nh ? l K C ibsp;       } &! d O - q * W I Snbsp;           assert (IOStatus.checkAa 0 9 r lll(addr)); &e W l X Z + @ & Lnbsp;          assert (addr % allocationGranularity&c x pnbsp;== 0);        p r $    int A L i ;isize =% p + + a y u Z (int)size;   &n] U absp;    &nb= e f A ! + ~sp; &- 8 ` i C 9 vnbsp; Unmapper um = new Unmapper(addra - 5 l M D O, map8 i } i JSize, isize, $ x F;mfd);         | y W l ~ z ,   if ((!writable) || (imode == MAP_RO)) {                return Util.newMappedByteBufferR(isize,         &nt % ) % ubsp;                         &nbO K o n e 1 ?sp; &nbsA S A T &p;           addr + pagePosition, &n; v Hbsp;       C + q | &nbsF r s .p;              9 1 8 _ ( 8 b H       , l ` J  A c 0 B   &nbv P H B  L 2sp;   = a % , ) P R h 0;        mF ; T ~ $fd,      v X i    &M A Vnbsp;         &n1 1 { J g - y zbsp;     f D =;                   X S G k    um);&K A 7 vnbsp;     R K } _ ~ 7 P p;      } else {                res # v Y Xturn Util.newMappedByteBuffer(isize,&nbb j g j ; ysp;      &0 q H 6 f z Knbsp;                &nc C f | D f Y `bsp3 l q C 8 , d } z;          &nbx 4 Q ~ M k K f Csp;     &G *  wnbsp; &nbsr v . z $ 5 N Wp;    addr + pagePosition,              : Y q y;                    &nbsA n 9 { } n Q /p;             mfd, } [ Z l N r;                   &nbz l | $ y a b [sp;     &nbl v Y b 2 ) msp;    ] p h @ : V J ] 3;            Y V ` . v = v     um);           5 Y u J = N c; }     } 

大致意思就是通过N Z ` I Native 方法获取内存映射的地址,如} C *果失败,手动 GC 再次映射。

最后通过内存映射的地址实例化出 MappF R Q X @ [edByteBuffer,MappedByteBX m h - & l ! -uffer 本身是一个抽象类,其实这里真正实例化d n F C ( J出来的是 DirectByteBuffel 7 J X 8 * + s vr。

DirectByteBuffer

DirectByteBuffer 继承于 MappedByteBuffer,从名字就可以猜测出开辟了一段直接的内存,并不会占用 JVM 的内存空间。

上一节中通过 File- W m + Schannel 映射出的 MappedByteBuffer 其实际也是 DirectByteBuffer,当然除了这种方式,也可以手动开辟一段空间:

ByteBuffer&nbI 2 | + Y ! 6 n nsp;directByteBuffer = ByteBuffer.allocateDirect(100); 

如上开辟F p v了 100 字节的直接内存空间。

Channel-to-Channelh 2 C 传输

经常需要从一个位置将文件传输到另外一个位置,FileChannel 提供了 transferTo() 方法用来提高传输的效率,首先看一个简单的实例:

public class Channel l l P & _ b Q iTransfer {     public static void main(String[] argv) throws ExceptionI A L {  &nbs^ E h I ;p; &! Z Lnbsp;    String files[]=new&nbso  A * m n 1p;String[1];    & | H a Snbsp;    files[0]=\"D://db.txt\";      &nbs{ l @  f { C 4p;  catFE @ . Y miles(Cha= { - 5 m t znnels.newChannel(System.out), fil^ a `es);     }      9 { 8 D W 1 m 4 G;private static void catFiles(WritableByteChannel target, String[] files)          &) X g W 1 $nbsp; {  ; throws Exception { ! G q A p C D B v   &x # inbsp;    for (int i = 0; a Q [ n a K h r hi < filM a L Y pesa Q f.length; i++) {       &nb6 ^ _ N E s T ysp; D ` ! x [ s ?    FileInputStream fis = new FileInputStream(files[i]);&nbs5 % B 3 , D , [p;N ! ^ K ^ -    ! l g / { E L  d ? & M g;&& { hnbsp;f d : ( t  &    &nbH g . : [ xsp;FileChannel channel = fis.getCN { % Y b ` ,hannel();  &nbw X 9 o E B S H ysp- P 7 G t;       && h Z c # [ 2nbsp;  channel.transferTo(0, channel.siu % Nze(), target);           &nbst * C / k . p _ _p; channel.close();             fis.close();         }&nG N & D i ~ X K hbsp;    } }X R C ( ( P o 

通过 FileChannel 的 transferTo() 方法将文件数据传输到 System.out 通道,接口定义如下:

public abstract lon] ` H y ) E L vg transferTo(long position, S G e f q (;long count,   i g ,;      { B q e;         H  w e J r: { a 7 X;          &nbf A ] T H isp; &nD w K m ~ r ( wbsp;&5 [ / * r % # n snbsp;    Writablet U WByteChannel target)      % { U;   throws IOException; 

几个参数也比较好理解,分别是开始传输的位. O z @ j q K U H置,传输的字l J D } L H节数,_ 2 1 J f Y { ~以及* ( Z N Z i `目标通道;transfeM g [ t e B & 7rTo() 允许将一个通道交叉连接到另一个通道,而不需要一个中间缓冲区来传递数据。g 8 b |

注:这里不需要中间缓冲区有两层意u E G思:第一层不需要用户空间缓冲区来拷贝内核缓冲区,另外一层两个通道都有自己的内核缓冲区,两个内核缓冲区也可以做到无需拷贝数据。

Netty 零拷贝

Netty 提供了零拷贝的 Buffer,在传输数据时,最终处理的数据会需要对单个传输的报文,进行a 4 [ R : e组合和拆分,NIO 原生的 ByteBuffer 无法做E [ % ! G i到,Netty 通过提供的 Composite(组合)和 Slice(拆分)两种 Buffer 来实现零拷贝。

看下面一张图会比较清C c 5 k E晰:

Netty、Kafka中的零拷贝技术到底有多牛?

TCP 层 HTTP 报文被分成了两个 ChannelBuffer,这两个 Buffer 对我们上层的逻辑(HTTP 处理)是没有意义的。

但是两个 ChannelBuffer 被组合起来,就成为了一个有意义的k a % + v HTTP 报文,这个报文对应的 ChannelBuffer,才8 X n |是能称之为“Message”的东西,这里用到了一个词“Virtual Buffer”; 4 g }

可以看一j V ~ 0 Y l 4 V c下 Netty 提供的 CompositeChannelBuffer 源码:

public class Compositk 8 M W @ o & % )eC& Y b p E bhannelBuffer extendsr R ? h X AbstractChannelBuffer { E : m T H;     private final ByteOrder order;     private ChannelBuffer[]N E E F j components; &nm $ : r * 2 Ybsp;   private int[] indices;     private int lae K s F _ ) BstAccessedComponentId;     private fina8 d : r y ; Tl boolean gathering;      x e q s . =public b. G Cyte&nb * H [ x xbsp;getByte(int index) {  &n6 B K % d !bsp; w w % 6 H c *;   &nD $ m Ubsp;&ne . B p 3bsp;int componentId = comp. U M P q s HonentId(index);      &nbsN E F K I y W ep;  return components[componentId].ge: & L k  ) p + atByte(index - indices[` f ? / .  * 2 /compoK ` j b ~ 5 p u AnentId]); &nbsQ 6 dp;   }     ...省略... 

Components 用来保存的就是所有接收到的 Buffer,Indices 记录每个 buffer 的起始位置,lastAccessedComponentId 记录上一次访问的 ComponentId。

CompositeChannelBuffer 并不会开辟新的内存并直接复制所有 ChannelBuffer 内容,而是直接保存了所有 Chan~ , x @ 6 m NnelBuffer 的引用,并在子 ChannelBuffer 里进d x v P = [行读写,实现了零拷贝。

其他零拷贝

RocketMQ 的消息采用顺序写到 commitlog 文件,然后利用 consume queue 文件作为索引。

RocketMQ 采用零拷贝 mmap+F s = L _write 的方式来回应 Consumer 的请求。

同样 Kafka 中存在大量的网络数据持久化到磁盘和磁盘文件通过网D b b ` . / D络发送的过程,e 3 WKafka使用了 Sendfile 零拷贝方式。

总结

零拷贝如果简单用 Java 里面对象的概率来理解的话,其实就是使用的都是对象的引用,i 4 y k每个引用对象的地方对8 d g其改变就都能改变此对象,永远只存在一_ | & G j = U份对象。

上一篇

4部“零差评”的丧尸片,《釜山行》垫底

下一篇

这些因为名字不好,而被低估的好电影,你都看过吗?

评论已经被关闭。

插入图片
返回顶部