Summary of rust high concurrency programming

beyondma 2021-10-14 04:50:06

Serverless Your concept is on fire , The industry no longer discusses whether to use Serverless The problem. , But shouting Serverless First Our slogan is to embrace Serverless, No server is not Serverless The essence of , You don't need to care about the server to work efficiently , It's just Serverless The core of winning . The ups and downs of traffic in the Internet era , Many technology giants are also defeated in the face of the impact of traffic , For the last few months B The crash of the station , The author has also written 《B The front end of the station collapsed , Don't panic at the back 》 To analyze the context , and Serverless With the automatic elasticity of rapid expansion , Can deal with similar shocks calmly , This also makes this new technology all the rage .

stay Serverless Behind the noise of ,Rust Seems to have firmly occupied C position , But in fact There are many patterns and routines to summarize under the topic of high concurrency , Especially like Tokio Professional programming framework , It is very helpful for programmers to write high-performance programs . therefore this paper Take what was introduced before Tokio Relevant knowledge points are supplemented and summarized .

Future What concept is it

simply Future No A value , It's a value type , A value type that can only be obtained in the future .Future Object must be implemented Rust Standard library Medium std::future:: future Interface .Future Output Output yes Future complete Can only be generated after Of value . stay Rust in Future adopt Manager call Future::poll Come on Push Future Arithmetic .Future It's essentially a state machine , And it can be nested , Let's take a look at the following example , stay main Function , We instantiate MainFuture And call .await, and MainFuture In addition to migrating between several States , There will also be a call to Delay Of Future, So as to achieve Future Nesting of .

MainFuture With State0 state As the initial state . When Scheduler call poll Method when ,MainFuture Will try to mention as much as possible l Its state . If future complete , Then return to Poll::Ready, If MainFuture No, End become , be Because it's waiting DelayFuture No, achieve Ready state , that Then return to Pending. The scheduler received Pending result , Will take this. MainFuture Put it back into the queue to be scheduled , It will be called again later Poll Methods to advance Future Implementation . As follows


use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
    when: Instant,
}
impl Future for Delay {
    type Output = &'static str;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
           
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}
enum MainFuture {
    
    State0,
    State1(Delay),
    Terminated,
}
impl Future for MainFuture {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;
      
        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(1);
                    let future = Delay { when };
                    println!("init status");
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            println!("delay finished this future is ready");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            println!("not ready");
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}
#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
   
    let mainFuture=MainFuture::State0;
    mainFuture.await;
   
}

Of course, this Future Of Realization There is One The obvious problem , From the running results, we can also know that the debugger has executed many times when it needs to wait Poll operation , Ideally, you need When Future Yes progress Execute when necessary Poll operation . Constantly favoritism Poll In fact, it degenerates into inefficient polling .

The solution is poll Function Context Parameters , This Context Namely Future Of waker(), adopt call waker Can be directed to The actuator sends a signal , indicate This The task should be Conduct Poll Operation . When Future When the state advances , call wake To inform the actuator , That's the positive solution, so we need to put Delay Change some of the code :

let waker = cx.waker().clone();
            let when = self.when;
            // Spawn a timer thread.
            thread::spawn(move || {
                let now = Instant::now();
                if now < when {
                    thread::sleep(when - now);
                }
                waker.wake();
            });

No matter what kind of high concurrency framework , In essence, it is based on this Task/Poll Mechanism scheduler , poll The essential work is to monitor the front of the chain Task Implementation status of .

let waker = cx.waker().clone();

            let when = self.when;

            // Spawn a timer thread.

            thread::spawn(move || {

                let now = Instant::now();

                if now < when {

                    thread::sleep(when - now);

                }

                waker.wake();

            });

The use of good Poll The mechanism of , The scheduling algorithm that can avoid the above event cycle and periodically traverse the whole event queue ,Poll The essence of is to change the state to ready Notifies the corresponding handler of the event , And based on poll Designed as tokio Framework for application development , Programmers don't have to care about the whole messaging , Only need to use and_then、spawn And other methods to establish the task chain and make the system work .

Implementation of data frame

frame Is the smallest unit in data transmission , Byte data below the frame granularity has no meaning for the application , At the same time, incomplete frames should also be filtered at the frame processing layer ,read_frame Method waits for the entire frame to be received before returning . Yes TcpStream::read() A single call to can return any number of data . It can contain the whole framework , Partial frame , Or multiple frames . If a partial frame is received , The data will be buffered , And read more data from the socket . If multiple frames are received , Returns the first frame , The rest of the data will be buffered , Until the next call read_frame. To achieve this ,Connection Need a read buffer field . Data is read from the socket into the read buffer . When a frame is parsed , The corresponding data will be deleted from the buffer . We will use BytesMut As a buffer type .

use bytes::BytesMut;use tokio::net::TcpStream;
pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}
impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

read_frame function Try parsing frames . If there is enough data to parse the frame , The frame is returned to read_frame() The caller . otherwise , Will try to read more data from the socket into the buffer . After reading more data , Call again parse_frame(). This time, , If enough data is received , Parsing may succeed . When reading data from a stream , The return value is 0 Indicates that data is no longer received from the peer . If there is still data in the read buffer , This indicates that some frames have been received , The connection is terminating abruptly . This is an error condition , And back to Err.

use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }
        // Ensure the buffer has capacity
        if self.buffer.len() == self.cursor {
            // Grow the buffer
            self.buffer.resize(self.cursor * 2, 0);
        }
        // Read into the buffer, tracking the number
        // of bytes read
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;
        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Update our cursor
            self.cursor += n;
        }
    }
}

Be careful Select

Another point worth noting is select, When using more than one channel , Any channel can be completed first . choice select! Key words will stay be-all Wait on the channel , And will mention the value on the first return channel . Be careful select! When you wait until the first one returns , Other Unfinished Mission Will be Cancel . As follows :

use tokio::sync::oneshot;
async fn some_operation() -> String {
    
    "hello beyondma".to_string()
}
#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();
      tokio::spawn(async {
        let _ = tx1.send("hello beyondma");
    });
    tokio::spawn(async {
        let _ = tx2.send("hi beyondma");
    });
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

If the execution result of the above code is not

hello beyondma

Or

hi beyondma

It is impossible to output both .

To explain select The mechanism of , We designed one by ourselves MySelect Of future, In the face of MySelect Conduct poll operation when , The first branch will be polled . If you're ready , Then use this value and complete MySelect. stay MySelect.await Received a Ready after , Whole future To be discarded . As follows :


use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}
impl Future for MySelect {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }
        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }
        Poll::Pending
    }
}
#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();
    // use tx1 and tx2
     tokio::spawn(async {
        let _ = tx1.send("hello beyondma");
    });
    tokio::spawn(async {
        let _ = tx2.send("hi beyondma");
    });
    MySelect {
        rx1,
        rx2,
    }.await;
}

Rust High concurrency summary

Rust In recent years, with Serverless A new language , On the surface, he looks like C, Neither JVM Virtual machines don't either GC Garbage collector , But take a closer look, he's not C,Rust I don't trust programmers , Trying to make Rust The compiler kills the errors in the program before generating the executable Build Stage . Because there is no GC therefore Rust It has created a set of variable life cycle and borrowing and calling mechanism . Developers must always be careful whether there is a problem with the life cycle of variables .

and Rust It's as hard as Martian Language , Multiple channels should be used before clone, Before using a locked hash table, you must unwrap, Various usages and Java、Go Completely different , But also because of such strict use restrictions , What we just mentioned Go In language Gorotine There will be problems , stay Rust It's not going to happen in the world , because Go Those uses of , All do not conform to Rust Checking of variable life cycle , It is impossible to compile and pass .

therefore Rust Very much like the carefree school , It's very difficult to get started , But as long as you can graduate , The program written can be compiled , Then you are 100% a master , So this is a high threshold , The ultimate language with a high ceiling .

at present Rust The most representative high concurrency programming framework is Tokio, At the beginning of this article Future Our example is based on Tokio It's written by the framework , I won't repeat it here .

According to the official statement, every Rust Of Tokio The only task is 64 Byte size , It's better than going straight through folk Thread to network request , Efficiency will increase by several orders of magnitude , With the help of high concurrency framework , Developers can squeeze the performance of hardware to the limit .

Please bring the original link to reprint ,thank
Similar articles