Skip to content
On this page

常用三方库

1、cargo-watch

1.1、安装

地址: https://crates.io/crates/cargo-watch

shell
cargo install cargo-watch

1.2、使用

shell
cargo watch -q -c -w src -x run

1.3、查看帮助

shell
cargo watch --help

2、tokio

2.1、官网

https://tokio.rs/tokio/tutorial

2.2、安装

js
[dependencies]
tokio = { version = "1.0", features = ["full"] }

2.3、使用

2.3.1、async

js
use std::{future::Future, time::Duration};

use tokio::time;

#[tokio::main]
async fn main() {
    let h1 = tokio::spawn(async {
        println!("asnyc1 result: {:?}", async1().await);

        println!("asnyc2 result is {}", async2().await);
    });
    let _ = tokio::join!(h1);
}

async fn async1() -> String {
    time::sleep(Duration::from_secs(1)).await;
    "async1".to_string()
}

// 与上面写法一致
fn async2() -> impl Future<Output = String> {
    async {
        time::sleep(Duration::from_secs(1)).await;
        "async2".to_string()
    }
}

2.3.2、简单示例

js
#[tokio::main]
async fn main() {
    let op = say_world();
    println!("hello world");
    op.await;
}

async fn say_world() {
    println!("world");
}

2.3.3、runtime

TIP

runtime不能在异步线程中执行

2.3.3.1、创建runtime
js
use tokio::runtime;
fn main() {
    let runtime = runtime::Runtime::new().unwrap();
    runtime.spawn(async {
      println!("Hello, world!");
    });
    println!("Hello, world!22222");
}
2.3.2.1、创建带有线程池的runtime
js
use tokio::runtime;

fn main() {
    let runtime = runtime::Builder::new_multi_thread().build().unwrap();
    for number in 1..100 {
        let future = async move {
            println!("{}", number);
        };
        runtime.spawn(future);
    }
    println!("ok");
    runtime.spawn(async move {
        println!("hello");
    });
}
2.3.2.2、future::ready
js
use std::future;

use tokio::runtime;

fn main() {
    let runtime = runtime::Builder::new_multi_thread().build().unwrap();
    let join_handle = runtime.spawn(future::ready(113));
    runtime.spawn(async move {
        println!("{}", join_handle.await.unwrap());
    });
}
2.3.2.3、async main

对于main函数,tokio提供了简化的异步运行时创建方式

js
#[tokio::main]
async fn main() {}

对于注解还有以下方式

js
#[tokio::main(flavor = "multi_thread"] // 等价于#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 10))]
#[tokio::main(worker_threads = 10))]

等价于以下代码

js
fn main(){
  tokio::runtime::Builder::new_multi_thread()
        .worker_threads(N)  
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { ... });
}
js
#[tokio::main(flavor = "current_thread")]

// 等价于
fn main() {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { ... })
}

2.3.4、AsyncRead

2.3.4.1、 lines 按行读
js
use tokio::{fs, io::{self, AsyncBufReadExt}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = fs::File::open("Cargo.toml").await?;
    let mut buf_reader = io::BufReader::new(file).lines();
    while let Some(line) = buf_reader.next_line().await? {
        println!("{}", line);
    }
    
    Ok(())
}
2.3.4.2、 read_line 按行读
js
use tokio::{fs, io::{self, AsyncBufReadExt}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = fs::File::open("Cargo.toml").await?;
    let mut buf_reader = io::BufReader::new(file);
    let mut contents = String::new();
    while let Ok(n) = buf_reader.read_line(&mut contents).await {
        if n == 0 {
            break;
        }
        println!("{}", contents);
        contents.clear();
    }

    Ok(())
}
2.3.4.3、read 按字节覆盖读取
js
use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let mut f = File::open("Cargo.toml").await.unwrap();
    let mut buf = [0; 5];

    let n = f.read(&mut buf).await.unwrap();
    let str = std::str::from_utf8(&buf[..n]).unwrap();
    println!("{}", str);

    let n = f.read(&mut buf).await.unwrap();
    let str = std::str::from_utf8(&buf[..n]).unwrap();
    println!("{}", str);
}
2.3.4.4、read_buf 按字节追加读取
js
use bytes::BytesMut;
use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let mut f = File::open("Cargo.toml").await.unwrap();
    let mut buf = BytesMut::with_capacity(5);

    // 第一次读取,读取容量大小的数据,即4字节数据,
    // 此时BytesMut内部的位移指针在offset = 3处
    let n = f.read_buf(&mut buf).await.unwrap();
    let str = std::str::from_utf8(&buf[..n]).unwrap();
    println!("{}", str);

    // 第二次读取,因buf已满,这次将一次性读取剩余所有数据(只请求一次读系统调用),
    // BytesMut也将自动扩容以便存放更多数据,且可能会根据所读数据的多少进行多次扩容,
    // 所读数据都将从index=4处开始追加保存, 遇到空行会终止
    let n = f.read_buf(&mut buf).await.unwrap();
    let str = std::str::from_utf8(&buf[..n]).unwrap();
    println!("{}", str);
}
2.3.4.5、read_exact

TIP

读取超出容量会报错

js
use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let mut f = File::open("Cargo.toml").await.unwrap();
    let mut buf = [0; 1024];

    let n = f.read_exact(&mut buf).await.unwrap();
    println!("first read {} bytes: {:?}", n, std::str::from_utf8(&buf[..n]));
}
2.3.4.6、read_to_string
2.3.4.7、read_to_end
js
use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let mut f = File::open("Cargo.toml").await.unwrap();
    let mut buf:Vec<u8> = Vec::new();

    let n = f.read_to_end(&mut buf).await.unwrap();
    let str = std::str::from_utf8(&buf[..n]);
    println!("first read {} bytes: {:?}", n, str);
}
2.3.4.8、take

TIP

take限制最多读取的字节数

js
use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let f = File::open("Cargo.toml").await.unwrap();
    let mut t = f.take(5);

    let mut buf = [0; 2];
    let n = t.read(&mut buf).await.unwrap();
    println!("{:?}", std::str::from_utf8(&buf[..n]));

    let mut buf = [0; 3];
    let n = t.read(&mut buf).await.unwrap();
    println!("{:?}", std::str::from_utf8(&buf[..n]));

    let mut buf = [0; 1024];
    t.set_limit(100);
    let n = t.read(&mut buf).await.unwrap();
    println!("{:?}", std::str::from_utf8(&buf[..n]));

    let n = t.read(&mut buf).await.unwrap();
    println!("{:?}", std::str::from_utf8(&buf[..n]));
}
2.3.4.9、chain

TIP

chain可将两个Reader串联起来(可多次串联)

js
use tokio::{fs::File, io::AsyncReadExt};

#[tokio::main]
async fn main() {
    let f = File::open("a.txt").await.unwrap();
    let f2 = File::open("b.txt").await.unwrap();
    let mut t = f.chain(f2);

    let mut buf = [0; 20];
    let n = t.read(&mut buf).await.unwrap();
    println!("{:?}", std::str::from_utf8(&buf[..n]));

    let n = t.read(&mut buf).await.unwrap();
    println!("{:?}", std::str::from_utf8(&buf[..n]));
}
2.3.4.10、split(指定分隔符)
js
use tokio::{fs::File, io::{AsyncBufReadExt, self}};

#[tokio::main]
async fn main() {
    let f = File::open("a.txt").await.unwrap();
    let mut lines = io::BufReader::new(f).split(b'\n');
    while let Some(line) = lines.next_segment().await.unwrap() {
        println!("read line: {}", String::from_utf8(line).unwrap());
    }
}
2.3.4.11、read_until(读到停止)
js
use tokio::{fs::File, io::{AsyncBufReadExt, self}};

#[tokio::main]
async fn main() {
    let f = File::open("a.txt").await.unwrap();
    let mut f = io::BufReader::new(f);
    
    let mut data:Vec<u8> = Vec::new();
    f.read_until(b'\n', &mut data).await.unwrap();
    print!("first line: {}", String::from_utf8(data).unwrap());
}

2.3.5、AsyncWrite

2.3.5.1、write_all
js
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("test.txt").await?;
    let buffer = b"Hello, world!";
    file.write_all(buffer).await?;
    Ok(())
}
2.3.5.2、write
js
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("test.txt").await?;
    file.write(b"Hello, world!").await?;
    Ok(())
}

2.3.6、tokio::spawn

js
use std::time::Duration;
use tokio::time::{sleep_until, Instant};

#[tokio::main]
async fn main() -> tokio::io::Result<()> {
    let handle = tokio::spawn(async {
        sleep_until(Instant::now() + Duration::from_secs(1)).await;
        println!("hello");
    });
    println!("----");
    let _ = handle.await;
}

2.3.7、TcpListener

2.3.7.1、Server Client

server

js
use tokio::{net::{TcpListener, TcpStream}, io::{AsyncWriteExt, AsyncReadExt}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "127.0.0.1:4000";
    let listener = TcpListener::bind(addr).await?;
    println!("Listening on: {}", addr);

    loop {
        let (stream, _) = listener.accept().await?;
        tokio::spawn(async move {
            if let Err(e) = handle_client(stream).await {
                eprintln!("Error: {}", e);
            }
        });
    }
}

async fn handle_client(mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    println!("new client connected");

    let mut buf = [0; 1024];
    stream.write_all(b"Welcome to the PingPong server!").await?;
    loop {
        let n = stream.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        stream.write_all(b"Pong\n").await?;
    }
    println!("client disconnected");
    
    Ok(())
}

client

js
use tokio::{net::TcpStream, io::{AsyncWriteExt, AsyncReadExt}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    connect().await?;

    Ok(())
}

async fn connect() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "127.0.0.1:4000";
    let mut stream = TcpStream::connect(addr).await?;
    println!("Connected to: {}", addr);

    let mut buf = [0; 1024];
    stream.write_all(b"Ping\n").await?;

    // loop {
    //     let n = stream.read(&mut buf).await?;
    //     if n == 0 {
    //         break;
    //     }
    //     let pong = std::str::from_utf8(&buf[..n])?;
    //     println!("11--client: {}", pong);
    // }

    let n = stream.read(&mut buf).await?;
    let pong = std::str::from_utf8(&buf[..n])?;
    println!("--{}", pong);

    Ok(())
}
2.3.7.2、Server mspc::Channel
js
use tokio::{net::{TcpListener, TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}}, sync::mpsc, io::{self, AsyncBufReadExt, AsyncWriteExt}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = TcpListener::bind("127.0.0.1:4000").await?;
    println!("Listening on: {}", server.local_addr()?);

    while let Ok((stream, _)) = server.accept().await {
        tokio::spawn(async move {
            if let Err(e) = handle_client(stream).await {
                eprintln!("Error: {}", e);
            }
        });
    }

    Ok(())
}

async fn handle_client(client: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    println!("new client connected");
    let (client_reader, client_writer) = client.into_split();
    // 创建消息通道
    let (msg_tx, msg_rx) = mpsc::channel::<String>(100);

    // 从客户端读取异步任务
    let mut read_task = tokio::spawn(async move {
        read_from_client(client_reader, msg_tx).await;
    });
    // 向客户端写入异步任务
    let mut write_task = tokio::spawn(async move {
        write_from_client(client_writer, msg_rx).await;
    });

    if tokio::try_join!(&mut read_task, &mut write_task).is_err() {
        eprintln!("read_task/write_task terminated");
        read_task.abort();
        write_task.abort();
    }
    Ok(())
}

// 从客户端读
async fn read_from_client(reader: OwnedReadHalf, msg_tx: mpsc::Sender<String>) {
    let mut buf_reader = io::BufReader::new(reader);
    let mut buf = String::new();
    loop {
        match buf_reader.read_line(&mut buf).await {
          Err(_) => {
              eprintln!("read from client error");
              break;
          },
          Ok(0) => {
              println!("client closed");
              break;
          },
          Ok(n) => {
              buf.pop();
              let content = buf.drain(..).as_str().to_string();
              println!("read {} bytes from client. content: {}", n, content);
              if msg_tx.send(content).await.is_err() {
                  eprintln!("receiver closed");
                  break;
              }
          },
        }
    }
}

// 給客户端写
async fn write_from_client(writer: OwnedWriteHalf, mut msg_rx: mpsc::Receiver<String>) {
    let mut buf_writer = io::BufWriter::new(writer);
    while let Some(mut msg) = msg_rx.recv().await {
        println!("writer to client: {}", msg);
        msg.push('\n');
        if let Err(e) = buf_writer.write_all(msg.as_bytes()).await {
            eprintln!("write to client failed: {}", e);
            break;
        }
        if let Err(e) = buf_writer.flush().await {
            eprintln!("flush to client failed: {}", e);
            break;
        }
    }
}

2.3.8、Timer

2.3.8.1、时间点 time::Instant
js
let now = tokio::time::Instant::now();
let next_3 = now + tokio::time::Duration::from_secs(3);
2.3.8.2、睡眠 time::Sleep
js
// 睡眠2秒
time::sleep(time::Duration::from_secs(2)).await;

// 一直睡眠,睡到2秒后醒来
time::sleep_until(time::Instant::now() + time::Duration::from_secs(2)).await;

获取当前时间

js
let start = time::Instant::now();
time::sleep(time::Duration::from_micros(10)).await;
println!("sleep {}", time::Instant::now().duration_since(start).as_nanos());

重置休眠时间

js
println!("start: {}", now());
let slp = time::sleep(time::Duration::from_secs(1));
tokio::pin!(slp);

slp.as_mut().reset(time::Instant::now() + time::Duration::from_secs(2));

slp.await;
println!("end: {}", now());
2.3.8.3、任务超时 time::Timeout
js
let res = time::timeout(time::Duration::from_secs(5), async {
    println!("sleeping: {}", now());
    time::sleep(time::Duration::from_secs(6)).await;
    33
});

match res.await {
    Err(_) => println!("task timeout: {}", now()),
    Ok(data) => println!("get the res '{}': {}", data, now()),
};
2.3.8.4、间隔任务 time::Interval
js
println!("before: {}", now());
let start = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
let mut int_v = tokio::time::interval_at(start, tokio::time::Duration::from_secs(1));

// 任务阻塞,5秒后被唤醒
int_v.tick().await;
println!("task 1: {}", now());

// 该计时任务"阻塞",直到1秒后被唤醒
int_v.tick().await;
println!("task 2: {}", now());

2.3.9、task的通信和同步

TIP

tokio提供几种不同功能的通道:

  • oneshot通道: 一对一发送的一次性通道,即该通道只能由一个发送者(Sender)发送最多一个数据,且只有一个接收者(Receiver)接收数据
  • mpsc通道: 多对一发送,即该通道可以同时有多个发送者向该通道发数据,但只有一个接收者接收数据
  • broadcast通道: 多对多发送,即该通道可以同时有多个发送者向该通道发送数据,也可以有多个接收者接收数据
  • watch通道: 一对多发送,即该通道只能有一个发送者向该通道发送数据,但可以有多个接收者接收数据
2.3.9.1、oneshot
js
// 1对1
let (tx, rx) = sync::oneshot::channel();
tokio::spawn(async move {
    if tx.send(33).is_err() {
        println!("receiver dropped");
    }
});
match rx.await {
    Ok(n) => println!("received {}", n),
    Err(_) => println!("receiver dropped"),
}
2.3.9.2、mpsc

一个异步

js
let (tx, mut rx) = sync::mpsc::channel::<i32>(10);
tokio::spawn(async move {
    for i in 1..=10 {
        if tx.send(i).await.is_err() {
            println!("receiver closed");
            return;
        }
    }
});

while let Some(i) = rx.recv().await {
    println!("received: {}", i);
}

多个异步

js
let (tx, mut rx) = sync::mpsc::channel::<i32>(10);
for i in 1..=10 {
    let tx = tx.clone();
    tokio::spawn(async move {
        if tx.send(i).await.is_err() {
            println!("receiver closed");
        }
    });
}
drop(tx);

while let Some(i) = rx.recv().await {
    println!("received: {}", i);
}
2.3.9.3、broadcast
js
// 最多存放16个消息
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
let mut rx2 = tx.subscribe();
let handle1 = tokio::spawn(async move {
    loop {
        match rx.recv().await {
            Ok(n) => {
                println!("rx receive: {:?}", n);
            },
            Err(e) => {
                println!("err: {:?}", e);
                break;
            }
        }
    }
});
let handle2 = tokio::spawn(async move {
  loop {
      match rx2.recv().await {
        Ok(n) => {
            println!("rx2 receive: {:?}", n);
        },
        Err(e) => {
            println!("err: {:?}", e);
            break;
        }
    }
  }
});

tx.send(10).unwrap();
tx.send(20).unwrap();

handle1.await.unwrap();
handle2.await.unwrap();
2.3.9.4、watch
js
// 创建watch通道时,需指定一个初始值存放在通道中
let (tx, mut rx) = tokio::sync::watch::channel("hello");
let mut rx1 = rx.clone();

tokio::spawn(async move {
    while rx1.changed().await.is_ok() {
        println!("changed1: {:?}", *rx1.borrow());
    }
});

tokio::spawn(async move {
    while rx.changed().await.is_ok() {
        println!("changed2: {:?}", *rx.borrow());
    }
});

tx.send("world").unwrap();

2.3.10、tokio::select!

server.rs

js
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
    net::TcpListener,
    sync::broadcast,
};

const SERVER_ADDR: &str = "127.0.0.1:8888";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind(SERVER_ADDR).await?;
    let (tx, _rx) = broadcast::channel(12);
    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("{} connected", addr);
        let tx = tx.clone();
        let mut rx = tx.subscribe();
        tokio::spawn(async move {
            let (r, mut w) = socket.split();
            let mut reader = BufReader::new(r);
            let mut msg = String::new();
            loop {
                tokio::select! {
                    res = reader.read_line(&mut msg) => {
                        if res.unwrap() == 0 {
                          break;
                        }
                        println!("server received {}: {}", addr, msg);
                        tx.send((msg.clone(), addr)).unwrap();
                        msg.clear();
                    },
                    res = rx.recv() => {
                        let (msg_str, other_addr) = res.unwrap();
                        if other_addr != addr {
                            println!("server send to {}: {}", other_addr, msg_str);
                            w.write_all(msg_str.as_bytes()).await.unwrap();
                        }
                    }
                }
            }
        });
    }
}

client.rs

js
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
    net::TcpStream,
    sync::mpsc,
};

const SERVER_ADDR: &str = "127.0.0.1:8888";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = TcpStream::connect(SERVER_ADDR).await?;
    println!("Connected to: {}", SERVER_ADDR);
    let (tx, mut rx) = mpsc::channel::<String>(3);
    tokio::spawn(async move {
        let (r, mut w) = client.into_split();
        let mut reader = BufReader::new(r);
        let mut line = String::new();
        loop {
            tokio::select! {
                res = reader.read_line(&mut line) => {
                    if res.unwrap() == 0 {
                        break;
                    }
                    println!("read from server: {}", line);
                    line.clear();
                }
                res = rx.recv() => {
                    let msg = res.unwrap();
                    println!("write to server: {}", msg);
                    w.write_all(msg.as_bytes()).await.unwrap();
                }
            }
        }
    });
    println!("Enter something to send to server");
    loop {
      let mut buf = String::new();
      std::io::stdin().read_line(&mut buf)?;
      // 不能写成buf.trim().to_string(), 使用tcp发送有问题
      tx.send(buf.clone()).await?;
      buf.clear();
    }
}

3、hyper

TIP

hyper是一个基于Rust语言的HTTP库, hyper-rustls是一个基于Rust语言的TLS库。

3.1、官网

https://hyper.rs/guides/0.14/

3.2、安装

js
[dependencies]
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.24.1"
tokio = { version = "1", features = ["full"] }

3.3、Server

https://github.com/hyperium/hyper/blob/0.14.x/examples

3.3.1、简单示例

js
use std::{net::SocketAddr, convert::Infallible};

use hyper::{service::{make_service_fn, service_fn}, Server, Request, Response, Body};

async fn hello_world(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
    Ok::<Response<Body>, Infallible>(Response::new("Hello, World".into()))
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 5000));
    
    let make_svc = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(hello_world))
    });
    // 创建服务
    let server = Server::bind(&addr).serve(make_svc);
    if let Err(e) = server.await {
        eprintln!("service error: {}", e);
    }
}

3.3.2、https/http请求代理

Cargo.toml

js
[dependencies]
hyper = {version = "0.14", features = ["full"]}
tokio = {version = "1", features = ["full"]}
hyper-rustls = "0.22"
futures-util = "0.3"
anyhow = "1.0.37"

main.rs

js
use anyhow::*;
use std::sync::Arc;
use std::net::SocketAddr;
use hyper::{Body, Request, Server, Client};
use hyper::service::{make_service_fn, service_fn};
// hyper-rustls = "0.22" 版本过高不行

#[tokio::main]
async fn main() -> Result<()> {
    let https = hyper_rustls::HttpsConnector::with_native_roots();
    let client = Client::builder().build::<_, hyper::Body>(https);
    let client = Arc::new(client);

    let addr = SocketAddr::from(([0, 0, 0, 0], 7000));
    let make_svc = make_service_fn(move |_conn| {
        let client = Arc::clone(&client);
        async move {
            Ok(service_fn(
                move |mut req| {
                    let client = Arc::clone(&client);
                    async move {
                        println!("proxy: {}", req.uri().path());
                        proxy_crate(&mut req)?;
                        client.request(req).await.context("proxy request")
                    }
                }
            ))
        }
    });

    let _server = Server::bind(&addr).serve(make_svc).await.context("Run server");

    Ok(())
}

fn proxy_crate(req: &mut Request<Body>) -> Result<()> {
    for key in &["content-length", "accept-encoding", "content-encoding", "transfer-encoding"] {
        req.headers_mut().remove(*key);
    }
    let uri = req.uri();
    let uri_string = match uri.query() {
        Some(query_item) => format!("https://crates.io{}?{}", uri.path(), query_item),
        None => format!("https://crates.io{}", uri.path()),
    };
    *req.uri_mut() = uri_string.parse().context("Parsing URI Error")?;
    Ok(())
}

3.3.3、api

js
use anyhow::{Result, Error};
use futures_util::{future::ok, TryStreamExt};
use hyper::{service::{make_service_fn, service_fn}, Request, Body, Server, Response, StatusCode, body, Method};

#[tokio::main]
async fn main() -> Result<()> {
    let addr = "127.0.0.1:3000".parse().unwrap();
    let service = make_service_fn(|_| async {
        Ok::<_, Error>(service_fn(echo))
    });
    
    let server = Server::bind(&addr).serve(service);
    println!("Listening on http://{}", addr);
    
    server.await?;

    Ok(())
}

async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
  match (req.method(), req.uri().path()) {
      (&Method::GET, "/") => Ok(Response::new(Body::from( "Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`"))),
      (&Method::POST, "/echo") => {
          Ok(Response::new(req.into_body()))
      },
      (&Method::POST, "/echo/uppercase") => {
          let chunk_stream = req.into_body().map_ok(|chunk| {
              chunk.iter().map(|c| c.to_ascii_uppercase()).collect::<Vec<u8>>()
          });
          Ok(Response::new(Body::wrap_stream(chunk_stream)))
      },
      (&Method::POST, "/echo/reversed") => {
          let body = body::to_bytes(req.into_body()).await?;
          let reversed_body = body.iter().rev().cloned().collect::<Vec<u8>>();
          Ok(Response::new(Body::from(reversed_body)))
      },
      _ => {
          let mut not_found = Response::default();
          *not_found.status_mut() = StatusCode::NOT_FOUND;
          Ok(not_found)
      },
  }
}

3.3.4、http代理

js
use std::net::SocketAddr;

use anyhow::{Error, Result, Context, Ok};
use futures_util::TryStreamExt;
use hyper::{
    body,
    service::{make_service_fn, service_fn},
    Body, Client, Method, Request, Response, Server, StatusCode,
};

#[tokio::main]
async fn main() -> Result<()> {
    let addr: SocketAddr = "127.0.0.1:3000".parse()?;
    let out_addr: SocketAddr = "127.0.0.1:60244".parse()?;

    let client = Client::new();

    let service = make_service_fn(move |_| {
        let client = client.clone();
        let out_addr_clone = out_addr.clone();
        async move {
            Ok(service_fn(move |mut req| {
                let uri_string = format!(
                  "http://{}{}",
                  out_addr_clone,
                  req.uri()
                      .path_and_query()
                      .map(|x| x.as_str())
                      .unwrap_or("/")
                );
                let uri = uri_string.parse().unwrap();
                *req.uri_mut() = uri;
                client.request(req)
            }))
        }
    });

    let server = Server::bind(&addr).serve(service);
    println!("Listening on http://{}", addr);

    server.await?;

    Ok(())
}

3.4、Client

3.4.1、简单示例

js
use anyhow::*;
use hyper::Client;

#[tokio::main]
async fn main() -> Result<()> {
  let client = Client::new();
  let uri = "http://httpbin.org/ip".parse()?;

  let resp = client.get(uri).await?;
  println!("Response: {}--{:?}", resp.status(), resp.body());

  Ok(())
}

3.4.2、打印body

js
use anyhow::Result;
use hyper::{Client, body::HttpBody};
use tokio::io::{stdout, AsyncWriteExt as _};

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::new();
    let uri = "http://httpbin.org/ip".parse()?;
    let mut resp = client.get(uri).await?;
    println!("{}", resp.status());

    while let Some(chunk) = resp.body_mut().data().await {
        // println!("{:?}", chunk?);
        stdout().write_all(&chunk?).await?;
    }
    Ok(())
}

3.4.3、post

js
use anyhow::Result;
use hyper::{Client, Request, Body, Method};

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::new();

    let req = Request::builder()
        .method(Method::POST)
        .uri("http://httpbin.org/post")
        .header("content-type", "application/json")
        .body(Body::from(r#"{"key":"value"}"#))?;
      
    let resp = client.request(req).await?;
    // body -> Bytes
    let res = hyper::body::to_bytes(resp.into_body()).await?;
    // bytes -> String
    println!("body: {:?}", String::from_utf8_lossy(&res));
    Ok(())
}

3.4.4、body to json

js
use anyhow::Result;
use hyper::{Client, body::Buf};
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct User {
    id: i32,
    name: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::new();
    let uri = "http://jsonplaceholder.typicode.com/users".parse()?;
    let res = client.get(uri).await?;
    let body = hyper::body::aggregate(res).await?;
    let users: Vec<User> = serde_json::from_reader(body.reader())?;

    for user in users {
        println!("id: {}, name: {}", user.id, user.name);
    }
     
    Ok(())
}

4、anyhow(异常捕捉)

4.1、安装

js
[dependencies]
anyhow = "1.0.75"

4.2、使用

4.2.1、Result

js
use std::fs::File;
use anyhow::Result;

fn main() -> Result<()> {
    File::open("Cargo1.toml")?;
    Ok(())
}

4.2.2、Context

js
use anyhow::{Result, Context};
use hyper::Client;

#[tokio::main]
async fn main() -> Result<()> {
  let client = Client::new();
  let uri = "http1://httpbin.org/ip".parse()?;

  let resp = client.get(uri).await.context("context")?;
  println!("Response: {}--{:?}", resp.status(), resp.body());

  Ok(())
}

4.2.3、with_context

js
use std::fs;
use anyhow::{Result, Context};

fn main() -> Result<()> {
    let path = "Cargo1.toml";
    let content = fs::read(path)        
        .with_context(|| format!("Failed to read instrs from {}", path))?;
    println!("{:?}", content);
    Ok(())
}

5、reqwest(请求库)

5.1、安装

js
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }

5.2、get请求

js
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let resp = reqwest::get("https://httpbin.org/ip")
        .await?
        .json::<HashMap<String, String>>()
        .await?;
    println!("{:#?}", resp);
    
    Ok(())
}

5.3、post请求

Cargo.toml

js
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
js
use serde::{Serialize, Deserialize};
use serde_json::json;

#[derive(Debug, Serialize, Deserialize)]
struct User {
   userId: u16,
   id: u16,
   title: String,
   body: String
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    let post = client
        .post("http://jsonplaceholder.typicode.com/posts")
        .json(&json!({
            "userId": 1,
            "title": "foo",
            "body": "bar"
        }))
        .send()
        .await?;
    let resp = post.json::<User>().await?;
    println!("{:#?}", resp);
    
    Ok(())
}

6、structopt(命令行参数转结构体)

6.1、安装

js
[dependencies]
structopt = "0.3"

6.2、使用

js
use std::path::PathBuf;

use structopt::StructOpt;

/// A basic example
#[derive(StructOpt, Debug)]
#[structopt(name = "basic")] // 命令行的名字
struct Opt {
    /// Activate debug mode
    #[structopt(short, long)]
    debug: bool,

    /// Set speed
    // we don't want to name it "speed", need to look smart
    #[structopt(short = "s", long = "speed", default_value = "42")]
    speed: f64,

    /// Input file
    #[structopt(parse(from_os_str))]
    input: PathBuf,

    /// Output file, stdout if not present
    #[structopt(parse(from_os_str))]
    output: Option<PathBuf>,

    /// Output file, stdout if not present
    #[structopt(name = "file", parse(from_os_str))]
    output2: Option<PathBuf>,

    /// Where to write the output: to `stdout` or `file`
    #[structopt(short)]
    out_type: String,
}

fn main() {
    let opt = Opt::from_args();
    println!("{:#?}", opt);
}

7、egui(异步库)

7.1、安装

js
[dependencies]
eframe = "0.23"
egui_extras = { version = "0.23", features = ["all_loaders"] }
image = { version = "0.24", features = ["jpeg", "png"] }

7.2、使用

7.2.1、基础控件

js
use eframe::egui;

#[derive(Default)]
struct MyApp {
    name: String,
    age: u32,
}

impl eframe::App for MyApp {
    fn update(&mut self, ctx: &egui::Context, frame: &mut eframe::Frame) {
      egui::CentralPanel::default().show(ctx, |ui| {
          ui.heading("My egui Application");
          ui.horizontal(|ui| {
              let name_label = ui.label("Your name: ");
              ui.text_edit_singleline(&mut self.name)
                  .labelled_by(name_label.id);
          });
          ui.add(egui::Slider::new(&mut self.age, 0..=120).text("age"));
          if ui.button("Click each year").clicked() {
              self.age += 1;
          }
          ui.label(format!("Hello '{}', age {}", self.name, self.age));

          ui.image(egui::include_image!(
              "../assets/logo.png"
          ));
      });
    }
}

fn main() -> Result<(), eframe::Error>{
    let options = eframe::NativeOptions {
        initial_window_size: Some(egui::vec2(640.0, 600.0)),
        ..Default::default()
    };
    eframe::run_native(
        "My egui App",
        options,
        Box::new(|cc| {
          egui_extras::install_image_loaders(&cc.egui_ctx);
          Box::new(MyApp::default())
        }),
    )
}

7.2.2、confirm确认框

js
use eframe::egui;

#[derive(Default)]
struct MyApp {
    allowed_to_close: bool,
    show_confirmation_dialog: bool,
}

impl eframe::App for MyApp {
    fn on_close_event(&mut self) -> bool {
        self.show_confirmation_dialog = true;
        self.allowed_to_close
    }

    fn update(&mut self, ctx: &egui::Context, frame: &mut eframe::Frame) {
      egui::CentralPanel::default().show(ctx, |ui| {
          ui.heading("My egui Application");
      });
      if self.show_confirmation_dialog {
          egui::Window::new("Do you want to quit?")
              .collapsible(false)
              .resizable(false)
              .show(ctx, |ui| {
                  ui.horizontal(|ui| {
                      if ui.button("Cancel").clicked() {
                          self.show_confirmation_dialog = false;
                      }

                      if ui.button("Yes!").clicked() {
                          self.allowed_to_close = true;
                          frame.close();
                      }
                  });
              });
      }
    }
}

fn main() -> Result<(), eframe::Error>{
    let options = eframe::NativeOptions {
        initial_window_size: Some(egui::vec2(640.0, 600.0)),
        ..Default::default()
    };
    eframe::run_native(
        "My egui App",
        options,
        Box::new(|cc| {
          Box::new(MyApp::default())
        }),
    )
}

7.2.3、键盘事件

js
impl eframe::App for Content {
  fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
      egui::CentralPanel::default().show(ctx, |ui| {
          ui.heading("Press/Hold/Release example. Press A to test.");
          if ui.button("Clear").clicked() {
              self.text.clear();
          }
          egui::ScrollArea::vertical()
              .auto_shrink([false; 2])
              .stick_to_bottom(true)
              .show(ui, |ui| {
                  ui.label(&self.text);
              });

          if ctx.input(|i| i.key_pressed(egui::Key::A)) {
              self.text.push_str("\nPressed");
          }
          if ctx.input(|i| i.key_down(egui::Key::A)) {
              self.text.push_str("\nHeld");
              ui.ctx().request_repaint(); // make sure we note the holding.
          }
          if ctx.input(|i| i.key_released(egui::Key::A)) {
              self.text.push_str("\nReleased");
          }
      });
  }
}

8、drive_builder

8.1、安装

js
[dependencies]
drive_builder = "0.1"

8.2、使用

8.2.1、基本使用

js
let mut ch = ChannelBuilder::default();
ch.ipsum("world");
let ch = ch.build().unwrap();
println!("{:?}", ch);

8.2.2、setter

js
use derive_builder::Builder;

#[derive(Builder, Debug)]
struct Channel {
    #[builder(setter(into, strip_option))]
    ipsum: Option<String>,
    #[builder(setter(into, strip_option), default)]
    foo: Option<String>,
}

fn main() {
    let ch = ChannelBuilder::default()
        .ipsum("hello")
        .build()
        .unwrap();
    println!("{:?}", ch);
}

8.2.3、重命名builder

js
use derive_builder::Builder;

#[derive(Builder, Debug)]
#[builder(name = "FooConstructor")]
struct Channel {
    #[builder(setter(into, strip_option))]
    ipsum: Option<String>,
    #[builder(setter(into, strip_option), default)]
    foo: Option<String>,
    #[builder(setter(skip))] // 跳过该属性,会使用默认值
    setter_skipped: String,
}

fn main() {
    let mut ch = FooConstructor::default();
    ch.ipsum("world");
    // ch.setter_skipped("heee".to_string());
    let ch = ch.build().unwrap();
    println!("{:?}", ch);
}

Released under the MIT License.