在上一篇中,主要帶大家深度剖析了「 生產(chǎn)者元數(shù)據(jù) 」的拉取、管理全流程,今天我們就來聊聊 Kafka 是如何對 Java NIO 進行封裝的 ,本系列總共分為3篇,主要剖析以下幾個問題:
本篇只討論前3個問題,剩余的放到后2篇中。
認真讀完這篇文章,我相信你會對 Kafka 封裝 Java NIO 源碼有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。
01 總體概述
上篇剖析了「 生產(chǎn)者元數(shù)據(jù)的拉取和管理的全過程 」,此時發(fā)送消息的時候就有了元數(shù)據(jù),但是還沒有進行網(wǎng)絡通信,而網(wǎng)絡通信是一個相對復雜的過程,對于 Java 系統(tǒng)來說網(wǎng)絡通信一般會采用 NIO 庫來實現(xiàn),所以 Kafka 對 Java NIO 封裝了統(tǒng)一的框架,來實現(xiàn)多路復用的網(wǎng)絡 I/O 操作 。
為了方便大家理解,所有的源碼只保留骨干。
02 Kafka 對 Java NIO 的封裝
如果大家對 Java NIO 不了解的話,可以看下這個文檔,這里就不過多介紹了。
https://pdai.tech/md/java/io/java-io-nio.html
我們來看看 Kafka 對 Java NIO 組件做了哪些封裝? 這里先說下結果,后面會深度剖析。
接下來我們挨個對上面組件進行剖析。
02 TransportLayer 封裝過程
TransportLayer 接口是對 NIO 中 「 SocketChannel 」 的封裝。它的實現(xiàn)類總共有 2 個:
本篇只剖析 PlaintextTransportLayer 的實現(xiàn)。
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.javapublic class PlaintextTransportLayer implements TransportLayer { // java nio 中 SelectionKey 事件 private final SelectionKey key; // java nio 中的SocketChannel private final SocketChannel socketChannel; // 安全相關 private final Principal principal = KafkaPrincipal.ANONYMOUS; // 初始化 public PlaintextTransportLayer(SelectionKey key) throws IOException { // 對 NIO 中 SelectionKey 類的對象引用 this.key = key; // 對 NIO 中 SocketChannel 類的對象引用 this.socketChannel = (SocketChannel) key.channel(); }}
從上面代碼可以看出,該類就是 對底層 NIO 的 socketChannel 封裝引用 。將構造函數(shù)的 SelectionKey 類對象賦值給 key,然后從 key 中取出對應的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。
接下來,我們看看幾個重要方法是如何使用這2個 NIO 組件的。
02.1 finishConnect()
@Override// 判斷網(wǎng)絡連接是否完成public boolean finishConnect() throws IOException { // 1. 調(diào)用socketChannel的finishConnect方法,返回該連接是否已經(jīng)連接完成 boolean connected = socketChannel.finishConnect(); // 2. 如果網(wǎng)絡連接完成以后就刪除對OP_CONNECT事件的監(jiān)聽,同時添加對OP_READ事件的監(jiān)聽 if (connected) // 事件操作 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); // 3. 最后返回網(wǎng)絡連接 return connected;}
該方法主要用來 判斷網(wǎng)絡連接是否完成 ,如果完成就關注 「 OP_READ 」 事件,并取消 「 OP_CONNECT 」 事件。
二進制位運算事件監(jiān)聽
這里通過「 二進制位運算 」巧妙的解決了網(wǎng)絡事件的監(jiān)聽操作,實現(xiàn)非常經(jīng)典。
通過 socketChannel 在 Selector 多路復用器注冊事件返回 SelectionKey ,SelectionKey 的類型包括:
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
首先” “符號代表按位取反,”&”代表按位取與,通過 key.interestOps() 獲取當前的事件,然后和 OP_CONNECT事件取反「 11110111 」 后按位與操作。
所以,”& xx” 代表刪除 xx 事件, 有就刪除,沒有就不變 ;而 “| xx” 代表將 xx 事件添加進去。
02.2 read()
@Overridepublic int read(ByteBuffer dst) throws IOException { // 調(diào)用 NIO 的通道實現(xiàn)數(shù)據(jù)的讀取 return socketChannel.read(dst);}
該方法主要用來 把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里 ,通過調(diào)用 socketChannel.read() 實現(xiàn)。
02.3 write()
@Overridepublic int write(ByteBuffer src) throws IOException { return socketChannel.write(src);}
該方法主要用來 把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫到 SocketChannel 里 ,通過調(diào)用 socketChannel.write() 實現(xiàn)。
大家都知道在網(wǎng)絡編程中,一次讀寫操作并一定能把數(shù)據(jù)讀寫完,所以就需要判斷是否讀寫完成,勢必會涉及數(shù)據(jù)的「 拆包 」、「 粘包 」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫操作進行重新封裝,分別對應 NetworkReceive 讀操作、NetworkSend 寫操作,對于上層調(diào)用無需判斷是否讀寫完成,更加友好 。
接下來我們就來分別剖析下這2個類的實現(xiàn)。
03 NetworkReceive 封裝過程
public class NetworkReceive implements Receive { …. // 空 ByteBuffer private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); private final String source; // 存儲響應消息數(shù)據(jù)長度 private final ByteBuffer size; // 響應消息數(shù)據(jù)的最大長度 private final int maxSize; // ByteBuffer 內(nèi)存池 private final MemoryPool memoryPool; // 已讀取字節(jié)大小 private int requestedBufferSize = -1; // 存儲響應消息數(shù)據(jù)體 private ByteBuffer buffer; // 初始化構造函數(shù) public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) { this.source = source; // 分配4個字節(jié)大小的數(shù)據(jù)長度 this.size = ByteBuffer.allocate(4); this.buffer = null; // 能接收消息的最大長度 this.maxSize = maxSize; this.memoryPool = memoryPool; }}
從屬性可以看出,包含2個 ByteBuffer,分別是 size 和 buffer。這里重點說下源碼中的 size字段 的初始化。通過長度編碼方式實現(xiàn),上來就先分配了 4字節(jié) 大小的 ByteBuffer 來存儲響應消息數(shù)據(jù)長度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長度的值。
介紹完字段后,我們來深度剖析下該類的幾個重要的方法。
03.1 readFrom()
public long readFrom(ScatteringByteChannel channel) throws IOException { // 讀取數(shù)據(jù)總大小 int read = 0; // 1.判斷響應消息數(shù)據(jù)長度的 ByteBuffer 是否讀完 if (size.hasRemaining()) { // 2.還有剩余,直接讀取消息數(shù)據(jù)的長度 int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); // 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里 read += bytesRead; // 4.判斷響應消息數(shù)據(jù)長度的緩存是否讀完了 if (!size.hasRemaining()) { // 5.重置position size.rewind(); // 6.讀取響應消息數(shù)據(jù)長度 int receiveSize = size.getInt(); // 7.如果有異常就拋出 if (receiveSize maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 8.將讀到數(shù)據(jù)長度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小 requestedBufferSize = receiveSize; if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } // 9.如果數(shù)據(jù)體buffer還沒有分配,且響應消息數(shù)據(jù)頭已讀完 if (buffer == null && requestedBufferSize != -1) { // 10.分配requestedBufferSize字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體buffer buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null) log.trace("Broker low on memory – could not allocate buffer of size {} for source {}", requestedBufferSize, source); } // 11.判斷buffer是否分配成功 if (buffer != null) { // 12.把channel里的數(shù)據(jù)讀到buffer中 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); // 13.累計讀取數(shù)據(jù)總大小 read += bytesRead; } // 14. 返回總大小 return read;}
該方法主要用來 把對應 channel 中的數(shù)據(jù)讀到 ByteBuffer 中 ,包括響應消息數(shù)據(jù)長度的 size 和響應消息數(shù)據(jù)體長度的 buffer,可能會被多次調(diào)用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。
在讀取時,先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內(nèi)存,然后讀滿整個 buffer 時就表示讀取完成了。
通過短短的30行左右代碼就解決了工業(yè)級「 拆包 」 、「 粘包 」 問題,相當?shù)慕?jīng)典 。
如果要解決「 粘包 」問題,就是在每個響應數(shù)據(jù)中間插入一個特殊的字節(jié)大小的「 分隔符 」,這里就在響應消息體前面插入4個字節(jié),代表響應消息自己本身的數(shù)據(jù)大小,如下圖所示:
具體「 拆包 」的操作步驟如下:
03.2 complete()
@Overridepublic boolean complete() { // 響應消息頭已讀完 && 響應消息體已讀完 return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}
該方法主要用來判斷是否都讀取完成, 即響應頭大小和響應體大小都讀取完 。
03.3 size()
// 返回大小public int size() { return payload().limit() + size.limit();}public ByteBuffer payload() { return this.buffer;}
該方法主要用來返回 響應頭和響應體還有多少數(shù)據(jù)需要讀出 。
此時已經(jīng)剖析完讀 Buffer 的封裝,接下來我們看看寫 Buffer。
04 NetworkSend 封裝過程
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java
調(diào)用關系圖如下:
04.1 Send 接口
我們先看一下接口 Send 都定義了哪些方法。
public interface Send { // 要把數(shù)據(jù)寫入目標的 channel id String destination(); // 要發(fā)送的數(shù)據(jù)是否發(fā)送完了 boolean completed(); // 把數(shù)據(jù)寫到對應 channel 中 long writeTo(GatheringByteChannel channel) throws IOException; // 發(fā)送數(shù)據(jù)的大小 long size();}
Send 作為要發(fā)送數(shù)據(jù)的接口, 子類 ByteBufferSend 實現(xiàn) complete() 方法用于判斷是否已經(jīng)發(fā)送完成,實現(xiàn) writeTo() 方法來實現(xiàn)寫入數(shù)據(jù)到Channel中。
04.2 ByteBufferSend 類
ByteBufferSend 類實現(xiàn)了 Send 接口, 即實現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel :
public class ByteBufferSend implements Send { private final String destination; // 總共要寫多少字節(jié)數(shù)據(jù) private final int size; // 用于寫入channel里的ByteBuffer數(shù)組,說明kafka一次最大傳輸字節(jié)是有限定的 protected final ByteBuffer[] buffers; // 總共還剩多少字節(jié)沒有寫完 private int remaining; private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer… buffers) { this.destination = destination; this.buffers = buffers; for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); // 計算需要寫入字節(jié)的總和 this.size = remaining; }}
我們來看下這個類中的幾個重要字段:
介紹完字段后,我們來深度剖析下該類的幾個重要的方法。
04.2.1 writeTo()
@Override// 將字節(jié)流數(shù)據(jù)寫入到channel中public long writeTo(GatheringByteChannel channel) throws IOException { // 1.調(diào)用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù) long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); // 2.計算還剩多少字節(jié)沒有寫入傳輸層 remaining -= written; // 每次發(fā)送 都檢查是否 pending = TransportLayers.hasPendingWrites(channel); return written;}
該方法主要用來 把 buffers 數(shù)組寫入到 SocketChannel里 ,因為在網(wǎng)絡編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調(diào)用底層 channel.write(buffers) 方法會返回「 已經(jīng)寫入成功多少字節(jié) 」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫入多少字節(jié)了。
04.2.2 some other
@Overridepublic String destination() { // 返回對應的channel id return destination;}@Overridepublic boolean completed() { // 判斷是否完成 即沒有剩余&pending=false return remaining <= 0 && !pending;}/** * always returns false as there will be not be any * pending writes since we directly write to socketChannel. */@Overridepublic boolean hasPendingWrites() { // 在PLAINTEXT下 pending 始終為 false return false;}@Overridepublic long size() { // 返回寫入字節(jié)的總和 return this.size;}
04.3 NetworkSend 類
NetworkSend 類繼承了 ByteBufferSend 類,真正用來寫 Buffer。
public class NetworkSend extends ByteBufferSend { // 實例化 public NetworkSend(String destination, ByteBuffer buffer) { // 調(diào)用父類的方法初始化 super(destination, sizeBuffer(buffer.remaining()), buffer); } // 用來構造4個字節(jié)的 sizeBuffer private static ByteBuffer sizeBuffer(int size) { // 先分配一個4個字節(jié)的ByteBuffer ByteBuffer sizeBuffer = ByteBuffer.allocate(4); // 寫入size長度值 sizeBuffer.putInt(size); // 重置 position sizeBuffer.rewind(); // 返回 sizeBuffer return sizeBuffer; }}
該類相對簡單些,就是構建一個發(fā)往 channel 對應的節(jié)點 id 的消息數(shù)據(jù),它的實例化過程如下:
另外 ByteBuffer[] 為兩個 buffer,可以理解為一個消息頭 buffer 即 size,一個消息體 buffer。消息頭 buffer 的長度為4byte,存放的是消息體 buffer 的長度。而消息體 buffer 是上層傳入的業(yè)務數(shù)據(jù),所以 send 就是持有一個待發(fā)送的 ByteBuffer 。
接下來我們來看看 KafkaChannel 是如何對上面幾個類進行封裝的。
05 KafkaChannel 封裝過程
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.javapublic class KafkaChannel implements AutoCloseable { …. // 節(jié)點 id private final String id; // 傳輸層對象 private final TransportLayer transportLayer; …. // 最大能接收請求的字節(jié)數(shù) private final int maxReceiveSize; // 內(nèi)存池,用來分配指定大小的 ByteBuffer private final MemoryPool memoryPool; // NetworkReceive 類的實例 private NetworkReceive receive; // NetworkSend 類的實例 private Send send; // 是否關閉連接 private boolean disconnected; …. // 連接狀態(tài) private ChannelState state; // 需要連接的遠端地址 private SocketAddress remoteAddress; // 初始化 public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) { this.id = id; this.transportLayer = transportLayer; this.authenticatorCreator = authenticatorCreator; this.authenticator = authenticatorCreator.get(); this.networkThreadTimeNanos = 0L; this.maxReceiveSize = maxReceiveSize; this.memoryPool = memoryPool; this.metadataRegistry = metadataRegistry; this.disconnected = false; this.muteState = ChannelMuteState.NOT_MUTED; this.state = ChannelState.NOT_CONNECTED; }}
我們來看下這個類中的幾個重要字段:
從屬性可以看出, 有3個最重要的成員變量:TransportLayer、NetworkReceive、Send 。KafkaChannel 通過 TransportLayer 進行讀寫操作,NetworkReceive 用來讀取,Send 用來寫出。
為了封裝普通和加密的Channel「 TransportLayer根據(jù)網(wǎng)絡協(xié)議的不同,提供不同的子類 」而對于 KafkaChannel 提供統(tǒng)一的接口,「 這是策略模式很好的應用 」。
介紹完字段后,我們來深度剖析下其 網(wǎng)絡讀寫操作 是如何實現(xiàn)的?
05.1 setSend()
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException(“Attempt to begin a send operation with prior send operation still in progress, connection id is ” + id); // 設置要發(fā)送消息的字段 this.send = send; // 調(diào)用傳輸層增加寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}// PlaintextTransportLayer 類方法@Overridepublic void addInterestOps(int ops) { //通過 key.interestOps() | ops 來添加事件 key.interestOps(key.interestOps() | ops);}
該方法主要用來 預發(fā)送,即在發(fā)送網(wǎng)絡請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中 ,然后調(diào)用傳輸層方法增加對這個 channel 上「 OP_WRITE 」事件的關注。當真正執(zhí)行發(fā)送的時候,會從 send 中讀取數(shù)據(jù)。
05.2 write()
public long write() throws IOException { // 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了 if (send == null) return 0; midWrite = true; // 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去 return send.writeTo(transportLayer);}
該方法主要用來 把保存在 send 上的數(shù)據(jù)真正發(fā)送出去 。
05.3 read()
public long read() throws IOException { // 如果receive為空表示數(shù)據(jù)已經(jīng)讀完,需要重新實例化對象 if (receive == null) { // 確保分配了 NetworkReceive receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //如果未讀完,嘗試讀取該對象 long bytesReceived = receive(this.receive); if (this.receive.requiredMemoryAmountKnown() && !this.receive.memoryAllocated() && isInMutableState()) { //pool must be out of memory, mute ourselves. mute(); } return bytesReceived;}
該方法主要用來 把從網(wǎng)絡I/O操作中讀出的數(shù)據(jù)保存到 NetworkReceive 中 。
05.4 maybeCompleteReceive()
public NetworkReceive maybeCompleteReceive() { if (receive != null && receive.complete()) { receive.payload().rewind(); NetworkReceive result = receive; receive = null; return result; } return null;}// NetworkReceivepublic boolean complete() { return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}
該方法主要用來 判斷數(shù)據(jù)已經(jīng)讀取完畢了 ,而判斷是否讀完的條件是 NetworkReceive 里的 buffer 是否用完 ,包括上面說過的表示響應消息頭 size ByteBuffer 和響應消息體本身的 buffer ByteBuffer。這兩個都讀完才算真正讀完了。
05.5 maybeCompleteSend()
// 可能完成發(fā)送public Send maybeCompleteSend() { if (send != null && send.completed()) { midWrite = false; transportLayer.removeInterestOps(SelectionKey.OP_WRITE); Send result = send; send = null; return result; } return null;}// PlaintextTransportLayer 類方法@Overridepublic void removeInterestOps(int ops) { // 通過 key.interestOps() & ~ops 來刪除事件 key.interestOps(key.interestOps() & ~ops);}// ByteBufferSend@Overridepublic boolean completed() { return remaining <= 0 && !pending;}
該方法主要用來 是否寫數(shù)據(jù)完畢了 ,而判斷的寫數(shù)據(jù)完畢的條件是 buffer 中沒有剩余且pending為false 。
最后我們來聊聊事件注冊和取消的具體時機,以便更好的理解網(wǎng)絡 I/O 操作。
06 事件注冊與取消時機
我們知道 Java NIO 是基于 epoll 模型來實現(xiàn)的。所有基于 epoll 的框架,都有3個階段:
這里我們來看下相關事件是何時被注冊和取消的。
06.1 OP_CONNECT 事件
06.1.1 OP_CONNECT 事件注冊時機
在 Selector 發(fā)起網(wǎng)絡連接的時候進行「 OP_CONNECT 」事件注冊。
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { SocketChannel socketChannel = SocketChannel.open(); SelectionKey key = null; try { // 注冊 OP_CONNECT 到 selector 上 key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); } catch (IOException | RuntimeException e){}}
06.1.2 OP_CONNECT 事件取消時機
在 PlainTransportLayer 明文傳輸層完成連接的時候取消 「 OP_CONNECT 」事件。
public boolean finishConnect() throws IOException { // 刪除連接事件,添加讀事件 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);}
06.2 OP_READ 事件
06.2.1 OP_READ 事件注冊時機
從上面也可以看出,「 OP_READ 」事件的注冊和「 OP_CONNECT 」事件的取消是同時進行的。
06.2.2 OP_READ 事件取消時機
由于 「 OP_READ 」事件是要一直監(jiān)聽是否有新數(shù)據(jù)到來,所以不會取消。并且因為是 Java NIO 使用的 「 epoll 的 LT 模式 」,只要「 讀緩沖區(qū) 」有數(shù)據(jù),就會一直觸發(fā)。
06.3 OP_WRITE 事件
06.3.1 OP_WRITE 事件注冊時機
在 KafkaChannel 真正發(fā)送網(wǎng)絡請求之前注冊「 OP_WRITE 」事件。
public void setSend(Send send) { // 調(diào)用傳輸層增加寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}
06.3.2 OP_WRITE 事件取消時機
public Send maybeCompleteSend() { if (send != null && send.completed()) { //完成一次發(fā)送后取消 OP_WRITE 事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE); }}
06.4 事件總結
07 總結
這里,我們一起來總結一下這篇文章的重點。
1、帶你先整體的梳理了 Kafka 對 Java NIO 封裝的組件以及調(diào)用關系圖。
2、分別帶你梳理了傳輸層 TransportLayer 的明文網(wǎng)絡傳輸層的實現(xiàn)、網(wǎng)絡讀操作 NetworkReceive、網(wǎng)絡寫操作 NetworkSend 的實現(xiàn)、以及 KafkaChannel 是如何進一步對上面組件進行封裝提供更加友好的網(wǎng)絡連接、讀寫操作的。
3、最后剖析了網(wǎng)絡 I/O 操作過程中的事件注冊和取消時機。