在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    Rust學習筆記(六十一)實例項目——多線程Web服務器

    在上一節(jié)中我們學習了構(gòu)建一個簡單的單線程Web服務器。它每次只能接收并處理一個請求,如果處理過程比較耗時,那么整個系統(tǒng)的吞吐率就很低。這一節(jié)我們學習用線程池來對其進行改進。

    在我們的想象中,改進后的main函數(shù)應該是這樣的:

    fn main() { let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let tp = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); tp.execute(|| handle_connection(stream)); }}

    應該有一個線程池ThreadPool,調(diào)用其關(guān)聯(lián)函數(shù)new可以創(chuàng)建一個自定義線程數(shù)的線程池。ThreadPool上還應該有一個execute方法,它接收閉包并傳遞給線程池內(nèi)的線程,由線程調(diào)用閉包。

    線程池§

    下一步就是完成一個基本的線程池。那線程池的數(shù)據(jù)結(jié)構(gòu)是什么樣的呢?首先線程池里應該有一系列的線程(可以存在Vec中)。為了便于管理這些線程,每個線程還應該帶有一些額外的信息,比如id。 線程池(主線程)如何管理這些線程呢?我們之前在四十九節(jié)學習過Channel進行線程間通信,那么這里就可以使用Channel對線程進行管理:

    • 線程池持有sender
    • 線程內(nèi)輪詢receiver,若收到任務就立即執(zhí)行

    暫時先考慮到這里,下面先進行編碼:

    首先將線程和線程上的額外信息抽象為一個名為Worker的結(jié)構(gòu)體

    struct Worker { id: usize, thread: Option,}

    這里使用Option來存放創(chuàng)建線程返回的JoinHandle,以便于后面停止線程池時可以取出JoinHandle調(diào)用上面的join方法等待現(xiàn)有的任務結(jié)束后再停止。下面要為其實現(xiàn)一個new方法,以便于構(gòu)造Worker。 前面說過線程內(nèi)要輪詢Channel的receiver來接收并執(zhí)行任務,所以new需要接收一個receiver。這個receiver是什么類型的呢? 首先channel在mpsc模塊下,之前學過,mpsc是多個生產(chǎn)者單個消費者的縮寫。但是這里是多個線程來消費同一個sender發(fā)送的消息,會造成數(shù)據(jù)競爭。所以需要使用Mutex來對receiver進行加鎖。但是多個線程都持有同一個receiver的話,又涉及到多線程的多重所有權(quán)。之前第五十節(jié)學過的多線程的多重所有權(quán)的知識,使用Arc可以解決。所以這里receiver的類型是Arc<Mutex>。 這里的消息應該分為兩類:

    • 線程需要執(zhí)行的正常任務
    • 線程池要停止時的終止消息

    可以使用枚舉來定義兩個Message的變體來實現(xiàn)。其中正常的任務應該是一個閉包,該閉包是傳入thread::spawn的參數(shù),通過查看thread::spawn函數(shù)的定義,發(fā)現(xiàn)該閉包的類型是FnOnce() -> T + Send + ‘static。那么就可以定義消息和消息中的任務了:

    type Job = Box;//定義類型別名省略代碼enum Message { NewJob(Job), Terminate,}

    然后定義Worker和它的new方法,Worker中的線程是這樣的:

    • 通過loop循環(huán)不斷接收消息
    • 判斷消息類型,正常任務則取出消息中的任務直接執(zhí)行
    • 若為終止消息則終止循環(huán),使該線程結(jié)束

    struct Worker { id: usize, thread: Option,}impl Worker { fn new(id: usize, receiver: Arc<Mutex>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!(“Worker {} got a job; executing.”, id); job(); } Message::Terminate => { println!(“Worker {} was told to terminate.”, id); break; } } }); Worker { id: id, thread: Option::Some(thread), } }}

    此處的loop為何不能寫成while let循環(huán)呢?下面是while let的代碼:

    while let Ok(job) = receiver.lock().unwrap().recv() {println!(“Worker {} got a job; executing.”, id);job();}

    因為鎖在循環(huán)塊外獲取,而while 表達式中的值在整個塊一直處于作用域中,job() 調(diào)用的過程中其仍然持有鎖,這意味著其他 worker 因無法獲取到鎖而不能接收任務。而loop循環(huán)時,我們在循環(huán)塊內(nèi)獲取鎖,lock 方法返回的 MutexGuard 在 let job 語句結(jié)束之后立刻就被丟棄了。這確保了 recv 調(diào)用過程中持有鎖,而在 job() 調(diào)用前鎖就被釋放了,這就允許并發(fā)處理多個請求了。

    Worker到此為止就實現(xiàn)完了,下面看ThreadPool的實現(xiàn):

    pub struct ThreadPool { workers: Vec, sender: mpsc::Sender,}impl ThreadPool {//接收線程數(shù)量并返回對應的線程池 pub fn new(size: usize) -> ThreadPool { assert!(size > 0);//先創(chuàng)建一個size大小的Vector let mut workers = Vec::with_capacity(size);//創(chuàng)建channel的sender和receiver let (sender, receiver) = mpsc::channel();//創(chuàng)建帶鎖的receiver的原子引用 let receiver = Arc::new(Mutex::new(receiver));//創(chuàng)建相應數(shù)量的Worker,并把對應的id和receiver傳入其中 for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); }//返回線程池 ThreadPool { workers: workers, sender: sender, } }//有任務時直接把任務通過sender發(fā)送到對應的receiver pub fn execute(&self, f: F) where F: FnOnce() + Send + ‘static, { self.sender.send(Message::NewJob(Box::new(f))).unwrap(); }}

    main.rs中的handle_connection函數(shù)與上一節(jié)中的一致。cargo run運行,然后同時發(fā)送多個請求(可用jmeter或其它工具),輸出:

    Worker 0 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 0 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 0 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 0 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 0 got a job; executing.Worker 2 got a job; executing.

    發(fā)現(xiàn)確實有4個線程在執(zhí)行任務。

    為線程池實現(xiàn)Drop trait§

    前面的main函數(shù):

    fn main() { let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let tp = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); tp.execute(|| handle_connection(stream)); }}

    如果main函數(shù)執(zhí)行完畢,一些變量就走出作用域,其中包括我們的ThreadPool,所以我們需要為其實現(xiàn)Drop trait。走出作用域時,等待現(xiàn)有的任務執(zhí)行完再清理。但是在此之前需要先向各線程發(fā)出終止消息,讓其跳出死循環(huán)。因為如果不跳出死循環(huán),線程池中的線程就會一直陷在死循環(huán)里,而主線程會一直等待其執(zhí)行完。

    實現(xiàn)Drop trait:

    impl Drop for ThreadPool { fn drop(&mut self) { for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!(“Shutting down all workers.”); for worker in &mut self.workers { println!(“Shutting down worker {}”, worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } }}

    為了更好的理解為什么需要兩個分開的循環(huán),想象一下只有兩個 worker 的場景。如果在一個單獨的循環(huán)中遍歷每個 worker,在第一次迭代中向通道發(fā)出終止消息并對第一個 worker 線程調(diào)用 join。如果此時第一個 worker 正忙于處理請求,那么第二個 worker 會收到終止消息并停止。我們會一直等待第一個 worker 結(jié)束,不過它永遠也不會結(jié)束因為第二個線程接收了終止消息。死鎖

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
    上一篇 2022年6月30日 20:05
    下一篇 2022年6月30日 20:05

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時間:周一至周五,10:30-18:30,節(jié)假日休息