在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    圖解 Kafka 網(wǎng)絡(luò)層實(shí)現(xiàn)機(jī)制之上篇

    圖解 Kafka 網(wǎng)絡(luò)層實(shí)現(xiàn)機(jī)制之上篇

    在上一篇中,主要帶大家深度剖析了「 生產(chǎn)者元數(shù)據(jù) 」的拉取、管理全流程,今天我們就來(lái)聊聊 Kafka 是如何對(duì) Java NIO 進(jìn)行封裝的 ,本系列總共分為3篇,主要剖析以下幾個(gè)問(wèn)題:

  • 針對(duì) Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來(lái)實(shí)現(xiàn)最基礎(chǔ)的網(wǎng)絡(luò)連接以及讀寫(xiě)操作的?
  • 剖析 KafkaChannel 是如何對(duì)傳輸層、讀寫(xiě) buffer 操作進(jìn)行封裝的?
  • 剖析工業(yè)級(jí) NIO 實(shí)戰(zhàn):如何基于位運(yùn)算來(lái)控制事件的監(jiān)聽(tīng)以及拆包、粘包是如何實(shí)現(xiàn)的?
  • 剖析 Kafka 是如何封裝 Selector 多路復(fù)用器的?
  • 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進(jìn)行連接以及網(wǎng)絡(luò)讀寫(xiě)的?
  • 剖析 Kafka 網(wǎng)絡(luò)發(fā)送消息和接收響應(yīng)的整個(gè)過(guò)程是怎樣的?
  • 本篇只討論前3個(gè)問(wèn)題,剩余的放到后2篇中。

    認(rèn)真讀完這篇文章,我相信你會(huì)對(duì) Kafka 封裝 Java NIO 源碼有更加深刻的理解。

    這篇文章干貨很多,希望你可以耐心讀完。

    01 總體概述

    上篇剖析了「 生產(chǎn)者元數(shù)據(jù)的拉取和管理的全過(guò)程 」,此時(shí)發(fā)送消息的時(shí)候就有了元數(shù)據(jù),但是還沒(méi)有進(jìn)行網(wǎng)絡(luò)通信,而網(wǎng)絡(luò)通信是一個(gè)相對(duì)復(fù)雜的過(guò)程,對(duì)于 Java 系統(tǒng)來(lái)說(shuō)網(wǎng)絡(luò)通信一般會(huì)采用 NIO 庫(kù)來(lái)實(shí)現(xiàn),所以 Kafka 對(duì) Java NIO 封裝了統(tǒng)一的框架,來(lái)實(shí)現(xiàn)多路復(fù)用的網(wǎng)絡(luò) I/O 操作 。

    為了方便大家理解,所有的源碼只保留骨干。

    02 Kafka 對(duì) Java NIO 的封裝

    如果大家對(duì) Java NIO 不了解的話,可以看下這個(gè)文檔,這里就不過(guò)多介紹了。

    https://pdai.tech/md/java/io/java-io-nio.html

    我們來(lái)看看 Kafka 對(duì) Java NIO 組件做了哪些封裝? 這里先說(shuō)下結(jié)果,后面會(huì)深度剖析。

  • TransportLayer:它是一個(gè)接口,封裝了底層 NIO 的 SocketChannel。
  • NetworkReceive:封裝了 NIO 的 ByteBuffer 中的讀 Buffer, 對(duì)網(wǎng)絡(luò)編程中的粘包、拆包經(jīng)典實(shí)現(xiàn) 。
  • NetworkSend:封裝了 NIO 的 ByteBuffer 中的寫(xiě) Buffer。
  • KafkaChannel:對(duì) TransportLayer、NetworkReceive、NetworkSend 進(jìn)一步封裝,屏蔽了底層的實(shí)現(xiàn)細(xì)節(jié),對(duì)上層更友好。
  • KafkaSelector:封裝了 NIO 的 Selector 多路復(fù)用器組件。
  • 接下來(lái)我們挨個(gè)對(duì)上面組件進(jìn)行剖析。

    02 TransportLayer 封裝過(guò)程

    TransportLayer 接口是對(duì) NIO 中 「 SocketChannel 」 的封裝。它的實(shí)現(xiàn)類(lèi)總共有 2 個(gè):

  • PlaintextTransportLayer:明文網(wǎng)絡(luò)傳輸實(shí)現(xiàn)。
  • SslTransportLayer:SSL 加密網(wǎng)絡(luò)傳輸實(shí)現(xiàn)。
  • 本篇只剖析 PlaintextTransportLayer 的實(shí)現(xiàn)。

    github 源碼地址如下:

    https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.javapublic class PlaintextTransportLayer implements TransportLayer { // java nio 中 SelectionKey 事件 private final SelectionKey key; // java nio 中的SocketChannel private final SocketChannel socketChannel; // 安全相關(guān) private final Principal principal = KafkaPrincipal.ANONYMOUS; // 初始化 public PlaintextTransportLayer(SelectionKey key) throws IOException { // 對(duì) NIO 中 SelectionKey 類(lèi)的對(duì)象引用 this.key = key; // 對(duì) NIO 中 SocketChannel 類(lèi)的對(duì)象引用 this.socketChannel = (SocketChannel) key.channel(); }}

    從上面代碼可以看出,該類(lèi)就是 對(duì)底層 NIO 的 socketChannel 封裝引用 。將構(gòu)造函數(shù)的 SelectionKey 類(lèi)對(duì)象賦值給 key,然后從 key 中取出對(duì)應(yīng)的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。

    接下來(lái),我們看看幾個(gè)重要方法是如何使用這2個(gè) NIO 組件的。

    02.1 finishConnect()

    @Override// 判斷網(wǎng)絡(luò)連接是否完成public boolean finishConnect() throws IOException { // 1. 調(diào)用socketChannel的finishConnect方法,返回該連接是否已經(jīng)連接完成 boolean connected = socketChannel.finishConnect(); // 2. 如果網(wǎng)絡(luò)連接完成以后就刪除對(duì)OP_CONNECT事件的監(jiān)聽(tīng),同時(shí)添加對(duì)OP_READ事件的監(jiān)聽(tīng) if (connected) // 事件操作 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); // 3. 最后返回網(wǎng)絡(luò)連接 return connected;}

    該方法主要用來(lái) 判斷網(wǎng)絡(luò)連接是否完成 ,如果完成就關(guān)注 「 OP_READ 」 事件,并取消 「 OP_CONNECT 」 事件。

  • 首先調(diào)用 socketChannel 通道的 finishConnect() 判斷連接是否完成。
  • 如果網(wǎng)絡(luò)連接完成以后就刪除對(duì) OP_CONNECT 事件的監(jiān)聽(tīng),同時(shí)添加對(duì) OP_READ 事件的監(jiān)聽(tīng),因?yàn)檫B接完成后就可能接收數(shù)據(jù)了。
  • 最后返回網(wǎng)絡(luò)連接 connected。
  • 二進(jìn)制位運(yùn)算事件監(jiān)聽(tīng)

    這里通過(guò)「 二進(jìn)制位運(yùn)算 」巧妙的解決了網(wǎng)絡(luò)事件的監(jiān)聽(tīng)操作,實(shí)現(xiàn)非常經(jīng)典。

    通過(guò) socketChannel 在 Selector 多路復(fù)用器注冊(cè)事件返回 SelectionKey ,SelectionKey 的類(lèi)型包括:

  • OP_READ:可讀事件,值為:1<<0 == 1 == 00000001。
  • OP_WRITE:可寫(xiě)事件,值為:1<<2 == 4 == 00000100。
  • OP_CONNECT:客戶端連接服務(wù)端的事件,一般為創(chuàng)建 SocketChannel 客戶端 channel,值為:1<<3 == 8 ==00001000。
  • OP_ACCEPT:服務(wù)端接收客戶端連接的事件,一般為創(chuàng)建 ServerSocketChannel 服務(wù)端 channel,值為:1<<4 == 16 == 00010000。
  • key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

    首先” “符號(hào)代表按位取反,”&”代表按位取與,通過(guò) key.interestOps() 獲取當(dāng)前的事件,然后和 OP_CONNECT事件取反「 11110111 」 后按位與操作。

    所以,”& xx” 代表刪除 xx 事件, 有就刪除,沒(méi)有就不變 ;而 “| xx” 代表將 xx 事件添加進(jìn)去。

    02.2 read()

    @Overridepublic int read(ByteBuffer dst) throws IOException { // 調(diào)用 NIO 的通道實(shí)現(xiàn)數(shù)據(jù)的讀取 return socketChannel.read(dst);}

    該方法主要用來(lái) 把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里 ,通過(guò)調(diào)用 socketChannel.read() 實(shí)現(xiàn)。

    02.3 write()

    @Overridepublic int write(ByteBuffer src) throws IOException { return socketChannel.write(src);}

    該方法主要用來(lái) 把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫(xiě)到 SocketChannel 里 ,通過(guò)調(diào)用 socketChannel.write() 實(shí)現(xiàn)。

    大家都知道在網(wǎng)絡(luò)編程中,一次讀寫(xiě)操作并一定能把數(shù)據(jù)讀寫(xiě)完,所以就需要判斷是否讀寫(xiě)完成,勢(shì)必會(huì)涉及數(shù)據(jù)的「 拆包 」、「 粘包 」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫(xiě)操作進(jìn)行重新封裝,分別對(duì)應(yīng) NetworkReceive 讀操作、NetworkSend 寫(xiě)操作,對(duì)于上層調(diào)用無(wú)需判斷是否讀寫(xiě)完成,更加友好 。

    接下來(lái)我們就來(lái)分別剖析下這2個(gè)類(lèi)的實(shí)現(xiàn)。

    03 NetworkReceive 封裝過(guò)程

    public class NetworkReceive implements Receive { …. // 空 ByteBuffer private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); private final String source; // 存儲(chǔ)響應(yīng)消息數(shù)據(jù)長(zhǎng)度 private final ByteBuffer size; // 響應(yīng)消息數(shù)據(jù)的最大長(zhǎng)度 private final int maxSize; // ByteBuffer 內(nèi)存池 private final MemoryPool memoryPool; // 已讀取字節(jié)大小 private int requestedBufferSize = -1; // 存儲(chǔ)響應(yīng)消息數(shù)據(jù)體 private ByteBuffer buffer; // 初始化構(gòu)造函數(shù) public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) { this.source = source; // 分配4個(gè)字節(jié)大小的數(shù)據(jù)長(zhǎng)度 this.size = ByteBuffer.allocate(4); this.buffer = null; // 能接收消息的最大長(zhǎng)度 this.maxSize = maxSize; this.memoryPool = memoryPool; }}

  • EMPTY_BUFFER:空 Buffer,值為 ByteBuffer.allocate(0)。
  • source:final類(lèi)型,用來(lái)確定對(duì)應(yīng) channel id。
  • size:final類(lèi)型,存儲(chǔ)響應(yīng)消息數(shù)據(jù)長(zhǎng)度,大小為4字節(jié)。
  • maxSize:final類(lèi)型,接收響應(yīng)消息數(shù)據(jù)的最大長(zhǎng)度。
  • memoryPool:final類(lèi)型,ByteBuffer 內(nèi)存池。
  • requestedBufferSize:已讀取字節(jié)大小。
  • buffer:存儲(chǔ)響應(yīng)消息數(shù)據(jù)體。
  • 從屬性可以看出,包含2個(gè) ByteBuffer,分別是 size 和 buffer。這里重點(diǎn)說(shuō)下源碼中的 size字段 的初始化。通過(guò)長(zhǎng)度編碼方式實(shí)現(xiàn),上來(lái)就先分配了 4字節(jié) 大小的 ByteBuffer 來(lái)存儲(chǔ)響應(yīng)消息數(shù)據(jù)長(zhǎng)度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長(zhǎng)度的值。

    介紹完字段后,我們來(lái)深度剖析下該類(lèi)的幾個(gè)重要的方法。

    03.1 readFrom()

    public long readFrom(ScatteringByteChannel channel) throws IOException { // 讀取數(shù)據(jù)總大小 int read = 0; // 1.判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的 ByteBuffer 是否讀完 if (size.hasRemaining()) { // 2.還有剩余,直接讀取消息數(shù)據(jù)的長(zhǎng)度 int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); // 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里 read += bytesRead; // 4.判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的緩存是否讀完了 if (!size.hasRemaining()) { // 5.重置position size.rewind(); // 6.讀取響應(yīng)消息數(shù)據(jù)長(zhǎng)度 int receiveSize = size.getInt(); // 7.如果有異常就拋出 if (receiveSize maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 8.將讀到數(shù)據(jù)長(zhǎng)度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小 requestedBufferSize = receiveSize; if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } // 9.如果數(shù)據(jù)體buffer還沒(méi)有分配,且響應(yīng)消息數(shù)據(jù)頭已讀完 if (buffer == null && requestedBufferSize != -1) { // 10.分配requestedBufferSize字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體buffer buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null) log.trace("Broker low on memory – could not allocate buffer of size {} for source {}", requestedBufferSize, source); } // 11.判斷buffer是否分配成功 if (buffer != null) { // 12.把channel里的數(shù)據(jù)讀到buffer中 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); // 13.累計(jì)讀取數(shù)據(jù)總大小 read += bytesRead; } // 14. 返回總大小 return read;}

    該方法主要用來(lái) 把對(duì)應(yīng) channel 中的數(shù)據(jù)讀到 ByteBuffer 中 ,包括響應(yīng)消息數(shù)據(jù)長(zhǎng)度的 size 和響應(yīng)消息數(shù)據(jù)體長(zhǎng)度的 buffer,可能會(huì)被多次調(diào)用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。

    在讀取時(shí),先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內(nèi)存,然后讀滿整個(gè) buffer 時(shí)就表示讀取完成了。

    通過(guò)短短的30行左右代碼就解決了工業(yè)級(jí)「 拆包 」 、「 粘包 」 問(wèn)題,相當(dāng)?shù)慕?jīng)典 。

    如果要解決「 粘包 」問(wèn)題,就是在每個(gè)響應(yīng)數(shù)據(jù)中間插入一個(gè)特殊的字節(jié)大小的「 分隔符 」,這里就在響應(yīng)消息體前面插入4個(gè)字節(jié),代表響應(yīng)消息自己本身的數(shù)據(jù)大小,如下圖所示:

    具體「 拆包 」的操作步驟如下:

  • 調(diào)用 size.hasRemaining() 返回 position 至 limit 之間的字節(jié)大小 來(lái)判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的 ByteBuffer 是否讀完。
  • 當(dāng)未讀完則通過(guò)調(diào)用 NIO 的方法 channel.read(size), 直接把讀取4字節(jié)的響應(yīng)消息數(shù)據(jù)的長(zhǎng)度寫(xiě)入到 ByteBuffer size 中 ,如果已經(jīng)讀取到了4字節(jié),此時(shí) position=4,與 limit 相同, 表示 ByteBuffer size 已經(jīng)讀滿了 。
  • 每次讀取后,累加到總讀取數(shù)據(jù)大小里
  • 再次判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的緩存是否讀完了。
  • 如果讀完了,先重置 position 位置為0,此時(shí)就可以從 ByteBuffer 中讀取數(shù)據(jù)了,然后 調(diào)用 size.getInt() 從 ByteBuffer 當(dāng)前 position 位置讀取4個(gè)字節(jié),并轉(zhuǎn)化成int 類(lèi)型數(shù)值賦給 receiveSize ,即響應(yīng)體的長(zhǎng)度。
  • 如果有異常就拋出,包括響應(yīng)數(shù)據(jù)體的長(zhǎng)度無(wú)效或者大于最大長(zhǎng)度等。
  • 將讀到響應(yīng)數(shù)據(jù)長(zhǎng)度賦值 requestedBufferSize,即數(shù)據(jù)體的大小。
  • 如果響應(yīng)數(shù)據(jù)體 buffer 還沒(méi)有分配,且響應(yīng)數(shù)據(jù)頭已讀完,分配 requestedBufferSize 字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體 buffer。
  • 如果 buffer 分配成功, 表示 size 已讀完,此時(shí)直接把 channel 里的響應(yīng)數(shù)據(jù)讀到跟它大小一致的 ByteBuffer 中 ,再次累計(jì)讀取數(shù)據(jù)總大小。
  • 最后返回?cái)?shù)據(jù)總大小。
  • 03.2 complete()

    @Overridepublic boolean complete() { // 響應(yīng)消息頭已讀完 && 響應(yīng)消息體已讀完 return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

    該方法主要用來(lái)判斷是否都讀取完成, 即響應(yīng)頭大小和響應(yīng)體大小都讀取完 。

    03.3 size()

    // 返回大小public int size() { return payload().limit() + size.limit();}public ByteBuffer payload() { return this.buffer;}

    該方法主要用來(lái)返回 響應(yīng)頭和響應(yīng)體還有多少數(shù)據(jù)需要讀出 。

    此時(shí)已經(jīng)剖析完讀 Buffer 的封裝,接下來(lái)我們看看寫(xiě) Buffer。

    04 NetworkSend 封裝過(guò)程

    github 源碼地址如下:

    https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java

    調(diào)用關(guān)系圖如下:

    04.1 Send 接口

    我們先看一下接口 Send 都定義了哪些方法。

    public interface Send { // 要把數(shù)據(jù)寫(xiě)入目標(biāo)的 channel id String destination(); // 要發(fā)送的數(shù)據(jù)是否發(fā)送完了 boolean completed(); // 把數(shù)據(jù)寫(xiě)到對(duì)應(yīng) channel 中 long writeTo(GatheringByteChannel channel) throws IOException; // 發(fā)送數(shù)據(jù)的大小 long size();}

    Send 作為要發(fā)送數(shù)據(jù)的接口, 子類(lèi) ByteBufferSend 實(shí)現(xiàn) complete() 方法用于判斷是否已經(jīng)發(fā)送完成,實(shí)現(xiàn) writeTo() 方法來(lái)實(shí)現(xiàn)寫(xiě)入數(shù)據(jù)到Channel中。

    04.2 ByteBufferSend 類(lèi)

    ByteBufferSend 類(lèi)實(shí)現(xiàn)了 Send 接口, 即實(shí)現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel :

    public class ByteBufferSend implements Send { private final String destination; // 總共要寫(xiě)多少字節(jié)數(shù)據(jù) private final int size; // 用于寫(xiě)入channel里的ByteBuffer數(shù)組,說(shuō)明kafka一次最大傳輸字節(jié)是有限定的 protected final ByteBuffer[] buffers; // 總共還剩多少字節(jié)沒(méi)有寫(xiě)完 private int remaining; private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer… buffers) { this.destination = destination; this.buffers = buffers; for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); // 計(jì)算需要寫(xiě)入字節(jié)的總和 this.size = remaining; }}

    我們來(lái)看下這個(gè)類(lèi)中的幾個(gè)重要字段:

  • destination:數(shù)據(jù)寫(xiě)入的目標(biāo) channel id。
  • size:總共需要往 channel 里寫(xiě)多少字節(jié)數(shù)據(jù)。
  • buffers:ByteBuffer數(shù)組類(lèi)型,用來(lái)存儲(chǔ)要寫(xiě)入 channel 里的數(shù)據(jù)。
  • remaining:ByteBuffer數(shù)組所有的ByteBuffer 還剩多少字節(jié)沒(méi)有寫(xiě)完。
  • 介紹完字段后,我們來(lái)深度剖析下該類(lèi)的幾個(gè)重要的方法。

    04.2.1 writeTo()

    @Override// 將字節(jié)流數(shù)據(jù)寫(xiě)入到channel中public long writeTo(GatheringByteChannel channel) throws IOException { // 1.調(diào)用nio底層write方法把buffers寫(xiě)入傳輸層返回寫(xiě)入的字節(jié)數(shù) long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); // 2.計(jì)算還剩多少字節(jié)沒(méi)有寫(xiě)入傳輸層 remaining -= written; // 每次發(fā)送 都檢查是否 pending = TransportLayers.hasPendingWrites(channel); return written;}

    該方法主要用來(lái) 把 buffers 數(shù)組寫(xiě)入到 SocketChannel里 ,因?yàn)樵诰W(wǎng)絡(luò)編程中,寫(xiě)一次不一定可以完全把數(shù)據(jù)都寫(xiě)成功,所以調(diào)用底層 channel.write(buffers) 方法會(huì)返回「 已經(jīng)寫(xiě)入成功多少字節(jié) 」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫(xiě)入多少字節(jié)了。

    04.2.2 some other

    @Overridepublic String destination() { // 返回對(duì)應(yīng)的channel id return destination;}@Overridepublic boolean completed() { // 判斷是否完成 即沒(méi)有剩余&pending=false return remaining <= 0 && !pending;}/** * always returns false as there will be not be any * pending writes since we directly write to socketChannel. */@Overridepublic boolean hasPendingWrites() { // 在PLAINTEXT下 pending 始終為 false return false;}@Overridepublic long size() { // 返回寫(xiě)入字節(jié)的總和 return this.size;}

    04.3 NetworkSend 類(lèi)

    NetworkSend 類(lèi)繼承了 ByteBufferSend 類(lèi),真正用來(lái)寫(xiě) Buffer。

    public class NetworkSend extends ByteBufferSend { // 實(shí)例化 public NetworkSend(String destination, ByteBuffer buffer) { // 調(diào)用父類(lèi)的方法初始化 super(destination, sizeBuffer(buffer.remaining()), buffer); } // 用來(lái)構(gòu)造4個(gè)字節(jié)的 sizeBuffer private static ByteBuffer sizeBuffer(int size) { // 先分配一個(gè)4個(gè)字節(jié)的ByteBuffer ByteBuffer sizeBuffer = ByteBuffer.allocate(4); // 寫(xiě)入size長(zhǎng)度值 sizeBuffer.putInt(size); // 重置 position sizeBuffer.rewind(); // 返回 sizeBuffer return sizeBuffer; }}

    該類(lèi)相對(duì)簡(jiǎn)單些,就是構(gòu)建一個(gè)發(fā)往 channel 對(duì)應(yīng)的節(jié)點(diǎn) id 的消息數(shù)據(jù),它的實(shí)例化過(guò)程如下:

  • 先分配一個(gè)4個(gè)字節(jié)的 ByteBuffer 的變量 sizeBuffer,再把要發(fā)送的數(shù)據(jù)長(zhǎng)度賦值給 sizeBuffer。
  • 此時(shí) sizeBuffer 的響應(yīng)頭字節(jié)數(shù)和 sizeBuffer 的響應(yīng)數(shù)據(jù)就都有了。
  • 然后調(diào)用父類(lèi) ByteBufferSend 的方法進(jìn)行初始化。
  • 另外 ByteBuffer[] 為兩個(gè) buffer,可以理解為一個(gè)消息頭 buffer 即 size,一個(gè)消息體 buffer。消息頭 buffer 的長(zhǎng)度為4byte,存放的是消息體 buffer 的長(zhǎng)度。而消息體 buffer 是上層傳入的業(yè)務(wù)數(shù)據(jù),所以 send 就是持有一個(gè)待發(fā)送的 ByteBuffer 。

    接下來(lái)我們來(lái)看看 KafkaChannel 是如何對(duì)上面幾個(gè)類(lèi)進(jìn)行封裝的。

    05 KafkaChannel 封裝過(guò)程

    github 源碼地址如下:

    https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.javapublic class KafkaChannel implements AutoCloseable { …. // 節(jié)點(diǎn) id private final String id; // 傳輸層對(duì)象 private final TransportLayer transportLayer; …. // 最大能接收請(qǐng)求的字節(jié)數(shù) private final int maxReceiveSize; // 內(nèi)存池,用來(lái)分配指定大小的 ByteBuffer private final MemoryPool memoryPool; // NetworkReceive 類(lèi)的實(shí)例 private NetworkReceive receive; // NetworkSend 類(lèi)的實(shí)例 private Send send; // 是否關(guān)閉連接 private boolean disconnected; …. // 連接狀態(tài) private ChannelState state; // 需要連接的遠(yuǎn)端地址 private SocketAddress remoteAddress; // 初始化 public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) { this.id = id; this.transportLayer = transportLayer; this.authenticatorCreator = authenticatorCreator; this.authenticator = authenticatorCreator.get(); this.networkThreadTimeNanos = 0L; this.maxReceiveSize = maxReceiveSize; this.memoryPool = memoryPool; this.metadataRegistry = metadataRegistry; this.disconnected = false; this.muteState = ChannelMuteState.NOT_MUTED; this.state = ChannelState.NOT_CONNECTED; }}

    我們來(lái)看下這個(gè)類(lèi)中的幾個(gè)重要字段:

  • id:channel 對(duì)應(yīng)的節(jié)點(diǎn) id。
  • transportLayer:傳輸層對(duì)象。
  • maxReceiveSize:最大能接收請(qǐng)求的字節(jié)數(shù)。
  • memoryPool:內(nèi)存池,用來(lái)分配指定大小的 ByteBuffer。
  • receive:NetworkReceive 類(lèi)的實(shí)例。
  • send:NetworkSend 類(lèi)的實(shí)例。
  • disconnected:是否關(guān)閉連接。
  • state:KafkaChannel 的狀態(tài)。
  • remoteAddress:需要連接的遠(yuǎn)端地址。
  • 從屬性可以看出, 有3個(gè)最重要的成員變量:TransportLayer、NetworkReceive、Send 。KafkaChannel 通過(guò) TransportLayer 進(jìn)行讀寫(xiě)操作,NetworkReceive 用來(lái)讀取,Send 用來(lái)寫(xiě)出。

    為了封裝普通和加密的Channel「 TransportLayer根據(jù)網(wǎng)絡(luò)協(xié)議的不同,提供不同的子類(lèi) 」而對(duì)于 KafkaChannel 提供統(tǒng)一的接口,「 這是策略模式很好的應(yīng)用 」。

  • 每個(gè) NetworkReceive 代表一個(gè)單獨(dú)的響應(yīng),KafkaChannel 讀取的數(shù)據(jù)會(huì)存儲(chǔ)到 NetworkReceive 中,當(dāng) NetworkReceive 讀滿,一個(gè)請(qǐng)求就完整讀取了。
  • 每個(gè) Send 代表一個(gè)單獨(dú)的請(qǐng)求,需要寫(xiě)出時(shí)只需賦值此變量,之后調(diào)用 write() 方法將其中的數(shù)據(jù)寫(xiě)出。
  • 介紹完字段后,我們來(lái)深度剖析下其 網(wǎng)絡(luò)讀寫(xiě)操作 是如何實(shí)現(xiàn)的?

    05.1 setSend()

    public void setSend(Send send) { if (this.send != null) throw new IllegalStateException(“Attempt to begin a send operation with prior send operation still in progress, connection id is ” + id); // 設(shè)置要發(fā)送消息的字段 this.send = send; // 調(diào)用傳輸層增加寫(xiě)事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}// PlaintextTransportLayer 類(lèi)方法@Overridepublic void addInterestOps(int ops) { //通過(guò) key.interestOps() | ops 來(lái)添加事件 key.interestOps(key.interestOps() | ops);}

    該方法主要用來(lái) 預(yù)發(fā)送,即在發(fā)送網(wǎng)絡(luò)請(qǐng)求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中 ,然后調(diào)用傳輸層方法增加對(duì)這個(gè) channel 上「 OP_WRITE 」事件的關(guān)注。當(dāng)真正執(zhí)行發(fā)送的時(shí)候,會(huì)從 send 中讀取數(shù)據(jù)。

    05.2 write()

    public long write() throws IOException { // 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了 if (send == null) return 0; midWrite = true; // 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去 return send.writeTo(transportLayer);}

    該方法主要用來(lái) 把保存在 send 上的數(shù)據(jù)真正發(fā)送出去 。

  • 首先判斷要發(fā)送的 send 是否為空,如果為空則表示在 KafkaChannel 的 Buffer 的數(shù)據(jù)都發(fā)送完畢了。
  • 如果不為空就調(diào)用ByteBufferSend.writeTo() 方法通過(guò)網(wǎng)絡(luò) I/O 操作將數(shù)據(jù)發(fā)送出去。
  • 05.3 read()

    public long read() throws IOException { // 如果receive為空表示數(shù)據(jù)已經(jīng)讀完,需要重新實(shí)例化對(duì)象 if (receive == null) { // 確保分配了 NetworkReceive receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //如果未讀完,嘗試讀取該對(duì)象 long bytesReceived = receive(this.receive); if (this.receive.requiredMemoryAmountKnown() && !this.receive.memoryAllocated() && isInMutableState()) { //pool must be out of memory, mute ourselves. mute(); } return bytesReceived;}

    該方法主要用來(lái) 把從網(wǎng)絡(luò)I/O操作中讀出的數(shù)據(jù)保存到 NetworkReceive 中 。

  • 判斷 receive 是否為空,如果為空 表示上次已讀完 ,需要重新實(shí)例化 NetworkReceive 對(duì)象。
  • 如果 receive 不為空, 表示未讀完,此時(shí)讀取的還是原先的 NetworkReceive 對(duì)象 ,然后再調(diào)用 receive() 方法嘗試把 channel 的數(shù)據(jù)讀到 NetworkReceive 對(duì)象中。
  • 最后返回讀到的字節(jié)數(shù)。
  • 05.4 maybeCompleteReceive()

    public NetworkReceive maybeCompleteReceive() { if (receive != null && receive.complete()) { receive.payload().rewind(); NetworkReceive result = receive; receive = null; return result; } return null;}// NetworkReceivepublic boolean complete() { return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

    該方法主要用來(lái) 判斷數(shù)據(jù)已經(jīng)讀取完畢了 ,而判斷是否讀完的條件是 NetworkReceive 里的 buffer 是否用完 ,包括上面說(shuō)過(guò)的表示響應(yīng)消息頭 size ByteBuffer 和響應(yīng)消息體本身的 buffer ByteBuffer。這兩個(gè)都讀完才算真正讀完了。

  • 當(dāng) buffer 讀完后調(diào)用 rewind 重置 position位置。
  • 將 receive 賦值給結(jié)果集 result
  • 此時(shí)讀完后將 receive 清空,以便下次讀。
  • 最后返回結(jié)果集 result,完成一次讀操作。
  • 05.5 maybeCompleteSend()

    // 可能完成發(fā)送public Send maybeCompleteSend() { if (send != null && send.completed()) { midWrite = false; transportLayer.removeInterestOps(SelectionKey.OP_WRITE); Send result = send; send = null; return result; } return null;}// PlaintextTransportLayer 類(lèi)方法@Overridepublic void removeInterestOps(int ops) { // 通過(guò) key.interestOps() & ~ops 來(lái)刪除事件 key.interestOps(key.interestOps() & ~ops);}// ByteBufferSend@Overridepublic boolean completed() { return remaining <= 0 && !pending;}

    該方法主要用來(lái) 是否寫(xiě)數(shù)據(jù)完畢了 ,而判斷的寫(xiě)數(shù)據(jù)完畢的條件是 buffer 中沒(méi)有剩余且pending為false 。

  • 當(dāng)寫(xiě)數(shù)據(jù)完畢后,取消傳輸層對(duì) OP_WRITE 事件的監(jiān)聽(tīng),完成一次寫(xiě)操作。
  • 將 send 賦值給結(jié)果集 result。
  • 此時(shí)讀完后將 send 清空,以便下次寫(xiě)。
  • 最后返回結(jié)果集 result,完成一次寫(xiě)操作。
  • 最后我們來(lái)聊聊事件注冊(cè)和取消的具體時(shí)機(jī),以便更好的理解網(wǎng)絡(luò) I/O 操作。

    06 事件注冊(cè)與取消時(shí)機(jī)

    我們知道 Java NIO 是基于 epoll 模型來(lái)實(shí)現(xiàn)的。所有基于 epoll 的框架,都有3個(gè)階段:

  • 注冊(cè)事件(OP_CONNECT, OP_ACCEPT, OP_READ, OP_WRITE)。
  • 輪詢網(wǎng)絡(luò)I/O是否就緒。
  • 執(zhí)行實(shí)際網(wǎng)絡(luò)I/O操作。
  • 這里我們來(lái)看下相關(guān)事件是何時(shí)被注冊(cè)和取消的。

    06.1 OP_CONNECT 事件

    06.1.1 OP_CONNECT 事件注冊(cè)時(shí)機(jī)

    在 Selector 發(fā)起網(wǎng)絡(luò)連接的時(shí)候進(jìn)行「 OP_CONNECT 」事件注冊(cè)。

    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { SocketChannel socketChannel = SocketChannel.open(); SelectionKey key = null; try { // 注冊(cè) OP_CONNECT 到 selector 上 key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); } catch (IOException | RuntimeException e){}}

    06.1.2 OP_CONNECT 事件取消時(shí)機(jī)

    在 PlainTransportLayer 明文傳輸層完成連接的時(shí)候取消 「 OP_CONNECT 」事件。

    public boolean finishConnect() throws IOException { // 刪除連接事件,添加讀事件 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);}

    06.2 OP_READ 事件

    06.2.1 OP_READ 事件注冊(cè)時(shí)機(jī)

    從上面也可以看出,「 OP_READ 」事件的注冊(cè)和「 OP_CONNECT 」事件的取消是同時(shí)進(jìn)行的。

    06.2.2 OP_READ 事件取消時(shí)機(jī)

    由于 「 OP_READ 」事件是要一直監(jiān)聽(tīng)是否有新數(shù)據(jù)到來(lái),所以不會(huì)取消。并且因?yàn)槭?Java NIO 使用的 「 epoll 的 LT 模式 」,只要「 讀緩沖區(qū) 」有數(shù)據(jù),就會(huì)一直觸發(fā)。

    06.3 OP_WRITE 事件

    06.3.1 OP_WRITE 事件注冊(cè)時(shí)機(jī)

    在 KafkaChannel 真正發(fā)送網(wǎng)絡(luò)請(qǐng)求之前注冊(cè)「 OP_WRITE 」事件。

    public void setSend(Send send) { // 調(diào)用傳輸層增加寫(xiě)事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}

    06.3.2 OP_WRITE 事件取消時(shí)機(jī)

    public Send maybeCompleteSend() { if (send != null && send.completed()) { //完成一次發(fā)送后取消 OP_WRITE 事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE); }}

    06.4 事件總結(jié)

  • 對(duì)于不同事件類(lèi)型的「 事件就緒 」:
  • OP_READ事件就緒:即當(dāng)有新數(shù)據(jù)到來(lái),需要去讀取。由于是基于 LT 模式,只要讀緩沖區(qū)有數(shù)據(jù),會(huì)一直觸發(fā)。
  • OP_WRITE事件就緒:即本地 socketchannel 緩沖區(qū)有沒(méi)有寫(xiě)滿。如果沒(méi)有寫(xiě)滿的話,就會(huì)一直觸發(fā)寫(xiě)事件。所以要避免「 寫(xiě)的死循環(huán) 」問(wèn)題,寫(xiě)完就要取消寫(xiě)事件。
  • OP_CONNECT事件就緒: 即 connect 連接完成。
  • OP_ACCEPT事件就緒:即有新的連接進(jìn)來(lái),調(diào)用 accept處理。
  • 不同類(lèi)型事件處理方式是不一樣的:
  • OP_CONNECT事件:注冊(cè)1次,連接成功之后,就取消了。有且僅有1次。
  • OP_READ事件:注冊(cè)之后不取消,一直監(jiān)聽(tīng)。
  • OP_WRITE事件:每調(diào)用一次send,注冊(cè)1次。send成功,取消注冊(cè)。
  • 07 總結(jié)

    這里,我們一起來(lái)總結(jié)一下這篇文章的重點(diǎn)。

    1、帶你先整體的梳理了 Kafka 對(duì) Java NIO 封裝的組件以及調(diào)用關(guān)系圖。

    2、分別帶你梳理了傳輸層 TransportLayer 的明文網(wǎng)絡(luò)傳輸層的實(shí)現(xiàn)、網(wǎng)絡(luò)讀操作 NetworkReceive、網(wǎng)絡(luò)寫(xiě)操作 NetworkSend 的實(shí)現(xiàn)、以及 KafkaChannel 是如何進(jìn)一步對(duì)上面組件進(jìn)行封裝提供更加友好的網(wǎng)絡(luò)連接、讀寫(xiě)操作的。

    3、最后剖析了網(wǎng)絡(luò) I/O 操作過(guò)程中的事件注冊(cè)和取消時(shí)機(jī)。

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場(chǎng),版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
    用戶投稿
    上一篇 2022年6月23日 06:23
    下一篇 2022年6月23日 06:23

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息