免费爱碰视频在线观看,九九精品国产屋,欧美亚洲尤物久久精品,1024在线观看视频亚洲

      Rust學(xué)習(xí)筆記(六十一)實(shí)例項(xiàng)目——多線程Web服務(wù)器

      在上一節(jié)中我們學(xué)習(xí)了構(gòu)建一個(gè)簡(jiǎn)單的單線程Web服務(wù)器。它每次只能接收并處理一個(gè)請(qǐng)求,如果處理過(guò)程比較耗時(shí),那么整個(gè)系統(tǒng)的吞吐率就很低。這一節(jié)我們學(xué)習(xí)用線程池來(lái)對(duì)其進(jìn)行改進(jìn)。

      在我們的想象中,改進(jìn)后的main函數(shù)應(yīng)該是這樣的:

      fn main() { let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let tp = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); tp.execute(|| handle_connection(stream)); }}

      應(yīng)該有一個(gè)線程池ThreadPool,調(diào)用其關(guān)聯(lián)函數(shù)new可以創(chuàng)建一個(gè)自定義線程數(shù)的線程池。ThreadPool上還應(yīng)該有一個(gè)execute方法,它接收閉包并傳遞給線程池內(nèi)的線程,由線程調(diào)用閉包。

      線程池§

      下一步就是完成一個(gè)基本的線程池。那線程池的數(shù)據(jù)結(jié)構(gòu)是什么樣的呢?首先線程池里應(yīng)該有一系列的線程(可以存在Vec中)。為了便于管理這些線程,每個(gè)線程還應(yīng)該帶有一些額外的信息,比如id。 線程池(主線程)如何管理這些線程呢?我們之前在四十九節(jié)學(xué)習(xí)過(guò)Channel進(jìn)行線程間通信,那么這里就可以使用Channel對(duì)線程進(jìn)行管理:

      • 線程池持有sender
      • 線程內(nèi)輪詢(xún)r(jià)eceiver,若收到任務(wù)就立即執(zhí)行

      暫時(shí)先考慮到這里,下面先進(jìn)行編碼:

      首先將線程和線程上的額外信息抽象為一個(gè)名為Worker的結(jié)構(gòu)體

      struct Worker { id: usize, thread: Option,}

      這里使用Option來(lái)存放創(chuàng)建線程返回的JoinHandle,以便于后面停止線程池時(shí)可以取出JoinHandle調(diào)用上面的join方法等待現(xiàn)有的任務(wù)結(jié)束后再停止。下面要為其實(shí)現(xiàn)一個(gè)new方法,以便于構(gòu)造Worker。 前面說(shuō)過(guò)線程內(nèi)要輪詢(xún)Channel的receiver來(lái)接收并執(zhí)行任務(wù),所以new需要接收一個(gè)receiver。這個(gè)receiver是什么類(lèi)型的呢? 首先channel在mpsc模塊下,之前學(xué)過(guò),mpsc是多個(gè)生產(chǎn)者單個(gè)消費(fèi)者的縮寫(xiě)。但是這里是多個(gè)線程來(lái)消費(fèi)同一個(gè)sender發(fā)送的消息,會(huì)造成數(shù)據(jù)競(jìng)爭(zhēng)。所以需要使用Mutex來(lái)對(duì)receiver進(jìn)行加鎖。但是多個(gè)線程都持有同一個(gè)receiver的話,又涉及到多線程的多重所有權(quán)。之前第五十節(jié)學(xué)過(guò)的多線程的多重所有權(quán)的知識(shí),使用Arc可以解決。所以這里receiver的類(lèi)型是Arc<Mutex>。 這里的消息應(yīng)該分為兩類(lèi):

      • 線程需要執(zhí)行的正常任務(wù)
      • 線程池要停止時(shí)的終止消息

      可以使用枚舉來(lái)定義兩個(gè)Message的變體來(lái)實(shí)現(xiàn)。其中正常的任務(wù)應(yīng)該是一個(gè)閉包,該閉包是傳入thread::spawn的參數(shù),通過(guò)查看thread::spawn函數(shù)的定義,發(fā)現(xiàn)該閉包的類(lèi)型是FnOnce() -> T + Send + ‘static。那么就可以定義消息和消息中的任務(wù)了:

      type Job = Box;//定義類(lèi)型別名省略代碼enum Message { NewJob(Job), Terminate,}

      然后定義Worker和它的new方法,Worker中的線程是這樣的:

      • 通過(guò)loop循環(huán)不斷接收消息
      • 判斷消息類(lèi)型,正常任務(wù)則取出消息中的任務(wù)直接執(zhí)行
      • 若為終止消息則終止循環(huán),使該線程結(jié)束

      struct Worker { id: usize, thread: Option,}impl Worker { fn new(id: usize, receiver: Arc<Mutex>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!(“Worker {} got a job; executing.”, id); job(); } Message::Terminate => { println!(“Worker {} was told to terminate.”, id); break; } } }); Worker { id: id, thread: Option::Some(thread), } }}

      此處的loop為何不能寫(xiě)成while let循環(huán)呢?下面是while let的代碼:

      while let Ok(job) = receiver.lock().unwrap().recv() {println!(“Worker {} got a job; executing.”, id);job();}

      因?yàn)殒i在循環(huán)塊外獲取,而while 表達(dá)式中的值在整個(gè)塊一直處于作用域中,job() 調(diào)用的過(guò)程中其仍然持有鎖,這意味著其他 worker 因無(wú)法獲取到鎖而不能接收任務(wù)。而loop循環(huán)時(shí),我們?cè)谘h(huán)塊內(nèi)獲取鎖,lock 方法返回的 MutexGuard 在 let job 語(yǔ)句結(jié)束之后立刻就被丟棄了。這確保了 recv 調(diào)用過(guò)程中持有鎖,而在 job() 調(diào)用前鎖就被釋放了,這就允許并發(fā)處理多個(gè)請(qǐng)求了。

      Worker到此為止就實(shí)現(xiàn)完了,下面看ThreadPool的實(shí)現(xiàn):

      pub struct ThreadPool { workers: Vec, sender: mpsc::Sender,}impl ThreadPool {//接收線程數(shù)量并返回對(duì)應(yīng)的線程池 pub fn new(size: usize) -> ThreadPool { assert!(size > 0);//先創(chuàng)建一個(gè)size大小的Vector let mut workers = Vec::with_capacity(size);//創(chuàng)建channel的sender和receiver let (sender, receiver) = mpsc::channel();//創(chuàng)建帶鎖的receiver的原子引用 let receiver = Arc::new(Mutex::new(receiver));//創(chuàng)建相應(yīng)數(shù)量的Worker,并把對(duì)應(yīng)的id和receiver傳入其中 for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); }//返回線程池 ThreadPool { workers: workers, sender: sender, } }//有任務(wù)時(shí)直接把任務(wù)通過(guò)sender發(fā)送到對(duì)應(yīng)的receiver pub fn execute(&self, f: F) where F: FnOnce() + Send + ‘static, { self.sender.send(Message::NewJob(Box::new(f))).unwrap(); }}

      main.rs中的handle_connection函數(shù)與上一節(jié)中的一致。cargo run運(yùn)行,然后同時(shí)發(fā)送多個(gè)請(qǐng)求(可用jmeter或其它工具),輸出:

      Worker 0 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 0 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 0 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 0 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 0 got a job; executing.Worker 2 got a job; executing.

      發(fā)現(xiàn)確實(shí)有4個(gè)線程在執(zhí)行任務(wù)。

      為線程池實(shí)現(xiàn)Drop trait§

      前面的main函數(shù):

      fn main() { let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let tp = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); tp.execute(|| handle_connection(stream)); }}

      如果main函數(shù)執(zhí)行完畢,一些變量就走出作用域,其中包括我們的ThreadPool,所以我們需要為其實(shí)現(xiàn)Drop trait。走出作用域時(shí),等待現(xiàn)有的任務(wù)執(zhí)行完再清理。但是在此之前需要先向各線程發(fā)出終止消息,讓其跳出死循環(huán)。因?yàn)槿绻惶鏊姥h(huán),線程池中的線程就會(huì)一直陷在死循環(huán)里,而主線程會(huì)一直等待其執(zhí)行完。

      實(shí)現(xiàn)Drop trait:

      impl Drop for ThreadPool { fn drop(&mut self) { for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!(“Shutting down all workers.”); for worker in &mut self.workers { println!(“Shutting down worker {}”, worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } }}

      為了更好的理解為什么需要兩個(gè)分開(kāi)的循環(huán),想象一下只有兩個(gè) worker 的場(chǎng)景。如果在一個(gè)單獨(dú)的循環(huán)中遍歷每個(gè) worker,在第一次迭代中向通道發(fā)出終止消息并對(duì)第一個(gè) worker 線程調(diào)用 join。如果此時(shí)第一個(gè) worker 正忙于處理請(qǐng)求,那么第二個(gè) worker 會(huì)收到終止消息并停止。我們會(huì)一直等待第一個(gè) worker 結(jié)束,不過(guò)它永遠(yuǎn)也不會(huì)結(jié)束因?yàn)榈诙€(gè)線程接收了終止消息。死鎖!

      鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場(chǎng),版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
      (0)
      用戶投稿
      上一篇 2022年6月30日 20:05
      下一篇 2022年6月30日 20:05

      相關(guān)推薦

      聯(lián)系我們

      聯(lián)系郵箱:admin#wlmqw.com
      工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息