常用三方库
1、cargo-watch
1.1、安装
地址: https://crates.io/crates/cargo-watch
shell
cargo install cargo-watch
1.2、使用
shell
cargo watch -q -c -w src -x run
1.3、查看帮助
shell
cargo watch --help
2、tokio
2.1、官网
https://tokio.rs/tokio/tutorial
2.2、安装
js
[dependencies]
tokio = { version = "1.0", features = ["full"] }
2.3、使用
2.3.1、async
js
use std::{future::Future, time::Duration};
use tokio::time;
#[tokio::main]
async fn main() {
let h1 = tokio::spawn(async {
println!("asnyc1 result: {:?}", async1().await);
println!("asnyc2 result is {}", async2().await);
});
let _ = tokio::join!(h1);
}
async fn async1() -> String {
time::sleep(Duration::from_secs(1)).await;
"async1".to_string()
}
// 与上面写法一致
fn async2() -> impl Future<Output = String> {
async {
time::sleep(Duration::from_secs(1)).await;
"async2".to_string()
}
}
2.3.2、简单示例
js
#[tokio::main]
async fn main() {
let op = say_world();
println!("hello world");
op.await;
}
async fn say_world() {
println!("world");
}
2.3.3、runtime
TIP
runtime
不能在异步线程中执行
2.3.3.1、创建runtime
js
use tokio::runtime;
fn main() {
let runtime = runtime::Runtime::new().unwrap();
runtime.spawn(async {
println!("Hello, world!");
});
println!("Hello, world!22222");
}
2.3.2.1、创建带有线程池的runtime
js
use tokio::runtime;
fn main() {
let runtime = runtime::Builder::new_multi_thread().build().unwrap();
for number in 1..100 {
let future = async move {
println!("{}", number);
};
runtime.spawn(future);
}
println!("ok");
runtime.spawn(async move {
println!("hello");
});
}
2.3.2.2、future::ready
js
use std::future;
use tokio::runtime;
fn main() {
let runtime = runtime::Builder::new_multi_thread().build().unwrap();
let join_handle = runtime.spawn(future::ready(113));
runtime.spawn(async move {
println!("{}", join_handle.await.unwrap());
});
}
2.3.2.3、async main
对于main函数,tokio提供了简化的异步运行时创建方式
js
#[tokio::main]
async fn main() {}
对于注解还有以下方式
js
#[tokio::main(flavor = "multi_thread"] // 等价于#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 10))]
#[tokio::main(worker_threads = 10))]
等价于以下代码
js
fn main(){
tokio::runtime::Builder::new_multi_thread()
.worker_threads(N)
.enable_all()
.build()
.unwrap()
.block_on(async { ... });
}
js
#[tokio::main(flavor = "current_thread")]
// 等价于
fn main() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { ... })
}
2.3.4、AsyncRead
2.3.4.1、 lines 按行读
js
use tokio::{fs, io::{self, AsyncBufReadExt}};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = fs::File::open("Cargo.toml").await?;
let mut buf_reader = io::BufReader::new(file).lines();
while let Some(line) = buf_reader.next_line().await? {
println!("{}", line);
}
Ok(())
}
2.3.4.2、 read_line 按行读
js
use tokio::{fs, io::{self, AsyncBufReadExt}};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = fs::File::open("Cargo.toml").await?;
let mut buf_reader = io::BufReader::new(file);
let mut contents = String::new();
while let Ok(n) = buf_reader.read_line(&mut contents).await {
if n == 0 {
break;
}
println!("{}", contents);
contents.clear();
}
Ok(())
}
2.3.4.3、read 按字节覆盖读取
js
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let mut f = File::open("Cargo.toml").await.unwrap();
let mut buf = [0; 5];
let n = f.read(&mut buf).await.unwrap();
let str = std::str::from_utf8(&buf[..n]).unwrap();
println!("{}", str);
let n = f.read(&mut buf).await.unwrap();
let str = std::str::from_utf8(&buf[..n]).unwrap();
println!("{}", str);
}
2.3.4.4、read_buf 按字节追加读取
js
use bytes::BytesMut;
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let mut f = File::open("Cargo.toml").await.unwrap();
let mut buf = BytesMut::with_capacity(5);
// 第一次读取,读取容量大小的数据,即4字节数据,
// 此时BytesMut内部的位移指针在offset = 3处
let n = f.read_buf(&mut buf).await.unwrap();
let str = std::str::from_utf8(&buf[..n]).unwrap();
println!("{}", str);
// 第二次读取,因buf已满,这次将一次性读取剩余所有数据(只请求一次读系统调用),
// BytesMut也将自动扩容以便存放更多数据,且可能会根据所读数据的多少进行多次扩容,
// 所读数据都将从index=4处开始追加保存, 遇到空行会终止
let n = f.read_buf(&mut buf).await.unwrap();
let str = std::str::from_utf8(&buf[..n]).unwrap();
println!("{}", str);
}
2.3.4.5、read_exact
TIP
读取超出容量会报错
js
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let mut f = File::open("Cargo.toml").await.unwrap();
let mut buf = [0; 1024];
let n = f.read_exact(&mut buf).await.unwrap();
println!("first read {} bytes: {:?}", n, std::str::from_utf8(&buf[..n]));
}
2.3.4.6、read_to_string
2.3.4.7、read_to_end
js
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let mut f = File::open("Cargo.toml").await.unwrap();
let mut buf:Vec<u8> = Vec::new();
let n = f.read_to_end(&mut buf).await.unwrap();
let str = std::str::from_utf8(&buf[..n]);
println!("first read {} bytes: {:?}", n, str);
}
2.3.4.8、take
TIP
take
限制最多读取的字节数
js
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let f = File::open("Cargo.toml").await.unwrap();
let mut t = f.take(5);
let mut buf = [0; 2];
let n = t.read(&mut buf).await.unwrap();
println!("{:?}", std::str::from_utf8(&buf[..n]));
let mut buf = [0; 3];
let n = t.read(&mut buf).await.unwrap();
println!("{:?}", std::str::from_utf8(&buf[..n]));
let mut buf = [0; 1024];
t.set_limit(100);
let n = t.read(&mut buf).await.unwrap();
println!("{:?}", std::str::from_utf8(&buf[..n]));
let n = t.read(&mut buf).await.unwrap();
println!("{:?}", std::str::from_utf8(&buf[..n]));
}
2.3.4.9、chain
TIP
chain
可将两个Reader
串联起来(可多次串联)
js
use tokio::{fs::File, io::AsyncReadExt};
#[tokio::main]
async fn main() {
let f = File::open("a.txt").await.unwrap();
let f2 = File::open("b.txt").await.unwrap();
let mut t = f.chain(f2);
let mut buf = [0; 20];
let n = t.read(&mut buf).await.unwrap();
println!("{:?}", std::str::from_utf8(&buf[..n]));
let n = t.read(&mut buf).await.unwrap();
println!("{:?}", std::str::from_utf8(&buf[..n]));
}
2.3.4.10、split(指定分隔符)
js
use tokio::{fs::File, io::{AsyncBufReadExt, self}};
#[tokio::main]
async fn main() {
let f = File::open("a.txt").await.unwrap();
let mut lines = io::BufReader::new(f).split(b'\n');
while let Some(line) = lines.next_segment().await.unwrap() {
println!("read line: {}", String::from_utf8(line).unwrap());
}
}
2.3.4.11、read_until(读到停止)
js
use tokio::{fs::File, io::{AsyncBufReadExt, self}};
#[tokio::main]
async fn main() {
let f = File::open("a.txt").await.unwrap();
let mut f = io::BufReader::new(f);
let mut data:Vec<u8> = Vec::new();
f.read_until(b'\n', &mut data).await.unwrap();
print!("first line: {}", String::from_utf8(data).unwrap());
}
2.3.5、AsyncWrite
2.3.5.1、write_all
js
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("test.txt").await?;
let buffer = b"Hello, world!";
file.write_all(buffer).await?;
Ok(())
}
2.3.5.2、write
js
use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("test.txt").await?;
file.write(b"Hello, world!").await?;
Ok(())
}
2.3.6、tokio::spawn
js
use std::time::Duration;
use tokio::time::{sleep_until, Instant};
#[tokio::main]
async fn main() -> tokio::io::Result<()> {
let handle = tokio::spawn(async {
sleep_until(Instant::now() + Duration::from_secs(1)).await;
println!("hello");
});
println!("----");
let _ = handle.await;
}
2.3.7、TcpListener
2.3.7.1、Server Client
server
js
use tokio::{net::{TcpListener, TcpStream}, io::{AsyncWriteExt, AsyncReadExt}};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:4000";
let listener = TcpListener::bind(addr).await?;
println!("Listening on: {}", addr);
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
if let Err(e) = handle_client(stream).await {
eprintln!("Error: {}", e);
}
});
}
}
async fn handle_client(mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
println!("new client connected");
let mut buf = [0; 1024];
stream.write_all(b"Welcome to the PingPong server!").await?;
loop {
let n = stream.read(&mut buf).await?;
if n == 0 {
break;
}
stream.write_all(b"Pong\n").await?;
}
println!("client disconnected");
Ok(())
}
client
js
use tokio::{net::TcpStream, io::{AsyncWriteExt, AsyncReadExt}};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
connect().await?;
Ok(())
}
async fn connect() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:4000";
let mut stream = TcpStream::connect(addr).await?;
println!("Connected to: {}", addr);
let mut buf = [0; 1024];
stream.write_all(b"Ping\n").await?;
// loop {
// let n = stream.read(&mut buf).await?;
// if n == 0 {
// break;
// }
// let pong = std::str::from_utf8(&buf[..n])?;
// println!("11--client: {}", pong);
// }
let n = stream.read(&mut buf).await?;
let pong = std::str::from_utf8(&buf[..n])?;
println!("--{}", pong);
Ok(())
}
2.3.7.2、Server mspc::Channel
js
use tokio::{net::{TcpListener, TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}}, sync::mpsc, io::{self, AsyncBufReadExt, AsyncWriteExt}};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = TcpListener::bind("127.0.0.1:4000").await?;
println!("Listening on: {}", server.local_addr()?);
while let Ok((stream, _)) = server.accept().await {
tokio::spawn(async move {
if let Err(e) = handle_client(stream).await {
eprintln!("Error: {}", e);
}
});
}
Ok(())
}
async fn handle_client(client: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
println!("new client connected");
let (client_reader, client_writer) = client.into_split();
// 创建消息通道
let (msg_tx, msg_rx) = mpsc::channel::<String>(100);
// 从客户端读取异步任务
let mut read_task = tokio::spawn(async move {
read_from_client(client_reader, msg_tx).await;
});
// 向客户端写入异步任务
let mut write_task = tokio::spawn(async move {
write_from_client(client_writer, msg_rx).await;
});
if tokio::try_join!(&mut read_task, &mut write_task).is_err() {
eprintln!("read_task/write_task terminated");
read_task.abort();
write_task.abort();
}
Ok(())
}
// 从客户端读
async fn read_from_client(reader: OwnedReadHalf, msg_tx: mpsc::Sender<String>) {
let mut buf_reader = io::BufReader::new(reader);
let mut buf = String::new();
loop {
match buf_reader.read_line(&mut buf).await {
Err(_) => {
eprintln!("read from client error");
break;
},
Ok(0) => {
println!("client closed");
break;
},
Ok(n) => {
buf.pop();
let content = buf.drain(..).as_str().to_string();
println!("read {} bytes from client. content: {}", n, content);
if msg_tx.send(content).await.is_err() {
eprintln!("receiver closed");
break;
}
},
}
}
}
// 給客户端写
async fn write_from_client(writer: OwnedWriteHalf, mut msg_rx: mpsc::Receiver<String>) {
let mut buf_writer = io::BufWriter::new(writer);
while let Some(mut msg) = msg_rx.recv().await {
println!("writer to client: {}", msg);
msg.push('\n');
if let Err(e) = buf_writer.write_all(msg.as_bytes()).await {
eprintln!("write to client failed: {}", e);
break;
}
if let Err(e) = buf_writer.flush().await {
eprintln!("flush to client failed: {}", e);
break;
}
}
}
2.3.8、Timer
2.3.8.1、时间点 time::Instant
js
let now = tokio::time::Instant::now();
let next_3 = now + tokio::time::Duration::from_secs(3);
2.3.8.2、睡眠 time::Sleep
js
// 睡眠2秒
time::sleep(time::Duration::from_secs(2)).await;
// 一直睡眠,睡到2秒后醒来
time::sleep_until(time::Instant::now() + time::Duration::from_secs(2)).await;
获取当前时间
js
let start = time::Instant::now();
time::sleep(time::Duration::from_micros(10)).await;
println!("sleep {}", time::Instant::now().duration_since(start).as_nanos());
重置休眠时间
js
println!("start: {}", now());
let slp = time::sleep(time::Duration::from_secs(1));
tokio::pin!(slp);
slp.as_mut().reset(time::Instant::now() + time::Duration::from_secs(2));
slp.await;
println!("end: {}", now());
2.3.8.3、任务超时 time::Timeout
js
let res = time::timeout(time::Duration::from_secs(5), async {
println!("sleeping: {}", now());
time::sleep(time::Duration::from_secs(6)).await;
33
});
match res.await {
Err(_) => println!("task timeout: {}", now()),
Ok(data) => println!("get the res '{}': {}", data, now()),
};
2.3.8.4、间隔任务 time::Interval
js
println!("before: {}", now());
let start = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5);
let mut int_v = tokio::time::interval_at(start, tokio::time::Duration::from_secs(1));
// 任务阻塞,5秒后被唤醒
int_v.tick().await;
println!("task 1: {}", now());
// 该计时任务"阻塞",直到1秒后被唤醒
int_v.tick().await;
println!("task 2: {}", now());
2.3.9、task的通信和同步
TIP
tokio提供几种不同功能的通道:
- oneshot通道: 一对一发送的一次性通道,即该通道只能由一个发送者(Sender)发送最多一个数据,且只有一个接收者(Receiver)接收数据
- mpsc通道: 多对一发送,即该通道可以同时有多个发送者向该通道发数据,但只有一个接收者接收数据
- broadcast通道: 多对多发送,即该通道可以同时有多个发送者向该通道发送数据,也可以有多个接收者接收数据
- watch通道: 一对多发送,即该通道只能有一个发送者向该通道发送数据,但可以有多个接收者接收数据
2.3.9.1、oneshot
js
// 1对1
let (tx, rx) = sync::oneshot::channel();
tokio::spawn(async move {
if tx.send(33).is_err() {
println!("receiver dropped");
}
});
match rx.await {
Ok(n) => println!("received {}", n),
Err(_) => println!("receiver dropped"),
}
2.3.9.2、mpsc
一个异步
js
let (tx, mut rx) = sync::mpsc::channel::<i32>(10);
tokio::spawn(async move {
for i in 1..=10 {
if tx.send(i).await.is_err() {
println!("receiver closed");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("received: {}", i);
}
多个异步
js
let (tx, mut rx) = sync::mpsc::channel::<i32>(10);
for i in 1..=10 {
let tx = tx.clone();
tokio::spawn(async move {
if tx.send(i).await.is_err() {
println!("receiver closed");
}
});
}
drop(tx);
while let Some(i) = rx.recv().await {
println!("received: {}", i);
}
2.3.9.3、broadcast
js
// 最多存放16个消息
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
let mut rx2 = tx.subscribe();
let handle1 = tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(n) => {
println!("rx receive: {:?}", n);
},
Err(e) => {
println!("err: {:?}", e);
break;
}
}
}
});
let handle2 = tokio::spawn(async move {
loop {
match rx2.recv().await {
Ok(n) => {
println!("rx2 receive: {:?}", n);
},
Err(e) => {
println!("err: {:?}", e);
break;
}
}
}
});
tx.send(10).unwrap();
tx.send(20).unwrap();
handle1.await.unwrap();
handle2.await.unwrap();
2.3.9.4、watch
js
// 创建watch通道时,需指定一个初始值存放在通道中
let (tx, mut rx) = tokio::sync::watch::channel("hello");
let mut rx1 = rx.clone();
tokio::spawn(async move {
while rx1.changed().await.is_ok() {
println!("changed1: {:?}", *rx1.borrow());
}
});
tokio::spawn(async move {
while rx.changed().await.is_ok() {
println!("changed2: {:?}", *rx.borrow());
}
});
tx.send("world").unwrap();
2.3.10、tokio::select!
server.rs
js
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::broadcast,
};
const SERVER_ADDR: &str = "127.0.0.1:8888";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(SERVER_ADDR).await?;
let (tx, _rx) = broadcast::channel(12);
loop {
let (mut socket, addr) = listener.accept().await?;
println!("{} connected", addr);
let tx = tx.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move {
let (r, mut w) = socket.split();
let mut reader = BufReader::new(r);
let mut msg = String::new();
loop {
tokio::select! {
res = reader.read_line(&mut msg) => {
if res.unwrap() == 0 {
break;
}
println!("server received {}: {}", addr, msg);
tx.send((msg.clone(), addr)).unwrap();
msg.clear();
},
res = rx.recv() => {
let (msg_str, other_addr) = res.unwrap();
if other_addr != addr {
println!("server send to {}: {}", other_addr, msg_str);
w.write_all(msg_str.as_bytes()).await.unwrap();
}
}
}
}
});
}
}
client.rs
js
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpStream,
sync::mpsc,
};
const SERVER_ADDR: &str = "127.0.0.1:8888";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = TcpStream::connect(SERVER_ADDR).await?;
println!("Connected to: {}", SERVER_ADDR);
let (tx, mut rx) = mpsc::channel::<String>(3);
tokio::spawn(async move {
let (r, mut w) = client.into_split();
let mut reader = BufReader::new(r);
let mut line = String::new();
loop {
tokio::select! {
res = reader.read_line(&mut line) => {
if res.unwrap() == 0 {
break;
}
println!("read from server: {}", line);
line.clear();
}
res = rx.recv() => {
let msg = res.unwrap();
println!("write to server: {}", msg);
w.write_all(msg.as_bytes()).await.unwrap();
}
}
}
});
println!("Enter something to send to server");
loop {
let mut buf = String::new();
std::io::stdin().read_line(&mut buf)?;
// 不能写成buf.trim().to_string(), 使用tcp发送有问题
tx.send(buf.clone()).await?;
buf.clear();
}
}
3、hyper
TIP
hyper是一个基于Rust语言的HTTP库, hyper-rustls是一个基于Rust语言的TLS库。
3.1、官网
3.2、安装
js
[dependencies]
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.24.1"
tokio = { version = "1", features = ["full"] }
3.3、Server
https://github.com/hyperium/hyper/blob/0.14.x/examples
3.3.1、简单示例
js
use std::{net::SocketAddr, convert::Infallible};
use hyper::{service::{make_service_fn, service_fn}, Server, Request, Response, Body};
async fn hello_world(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok::<Response<Body>, Infallible>(Response::new("Hello, World".into()))
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 5000));
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(hello_world))
});
// 创建服务
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
eprintln!("service error: {}", e);
}
}
3.3.2、https/http请求代理
Cargo.toml
js
[dependencies]
hyper = {version = "0.14", features = ["full"]}
tokio = {version = "1", features = ["full"]}
hyper-rustls = "0.22"
futures-util = "0.3"
anyhow = "1.0.37"
main.rs
js
use anyhow::*;
use std::sync::Arc;
use std::net::SocketAddr;
use hyper::{Body, Request, Server, Client};
use hyper::service::{make_service_fn, service_fn};
// hyper-rustls = "0.22" 版本过高不行
#[tokio::main]
async fn main() -> Result<()> {
let https = hyper_rustls::HttpsConnector::with_native_roots();
let client = Client::builder().build::<_, hyper::Body>(https);
let client = Arc::new(client);
let addr = SocketAddr::from(([0, 0, 0, 0], 7000));
let make_svc = make_service_fn(move |_conn| {
let client = Arc::clone(&client);
async move {
Ok(service_fn(
move |mut req| {
let client = Arc::clone(&client);
async move {
println!("proxy: {}", req.uri().path());
proxy_crate(&mut req)?;
client.request(req).await.context("proxy request")
}
}
))
}
});
let _server = Server::bind(&addr).serve(make_svc).await.context("Run server");
Ok(())
}
fn proxy_crate(req: &mut Request<Body>) -> Result<()> {
for key in &["content-length", "accept-encoding", "content-encoding", "transfer-encoding"] {
req.headers_mut().remove(*key);
}
let uri = req.uri();
let uri_string = match uri.query() {
Some(query_item) => format!("https://crates.io{}?{}", uri.path(), query_item),
None => format!("https://crates.io{}", uri.path()),
};
*req.uri_mut() = uri_string.parse().context("Parsing URI Error")?;
Ok(())
}
3.3.3、api
js
use anyhow::{Result, Error};
use futures_util::{future::ok, TryStreamExt};
use hyper::{service::{make_service_fn, service_fn}, Request, Body, Server, Response, StatusCode, body, Method};
#[tokio::main]
async fn main() -> Result<()> {
let addr = "127.0.0.1:3000".parse().unwrap();
let service = make_service_fn(|_| async {
Ok::<_, Error>(service_fn(echo))
});
let server = Server::bind(&addr).serve(service);
println!("Listening on http://{}", addr);
server.await?;
Ok(())
}
async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => Ok(Response::new(Body::from( "Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`"))),
(&Method::POST, "/echo") => {
Ok(Response::new(req.into_body()))
},
(&Method::POST, "/echo/uppercase") => {
let chunk_stream = req.into_body().map_ok(|chunk| {
chunk.iter().map(|c| c.to_ascii_uppercase()).collect::<Vec<u8>>()
});
Ok(Response::new(Body::wrap_stream(chunk_stream)))
},
(&Method::POST, "/echo/reversed") => {
let body = body::to_bytes(req.into_body()).await?;
let reversed_body = body.iter().rev().cloned().collect::<Vec<u8>>();
Ok(Response::new(Body::from(reversed_body)))
},
_ => {
let mut not_found = Response::default();
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
},
}
}
3.3.4、http代理
js
use std::net::SocketAddr;
use anyhow::{Error, Result, Context, Ok};
use futures_util::TryStreamExt;
use hyper::{
body,
service::{make_service_fn, service_fn},
Body, Client, Method, Request, Response, Server, StatusCode,
};
#[tokio::main]
async fn main() -> Result<()> {
let addr: SocketAddr = "127.0.0.1:3000".parse()?;
let out_addr: SocketAddr = "127.0.0.1:60244".parse()?;
let client = Client::new();
let service = make_service_fn(move |_| {
let client = client.clone();
let out_addr_clone = out_addr.clone();
async move {
Ok(service_fn(move |mut req| {
let uri_string = format!(
"http://{}{}",
out_addr_clone,
req.uri()
.path_and_query()
.map(|x| x.as_str())
.unwrap_or("/")
);
let uri = uri_string.parse().unwrap();
*req.uri_mut() = uri;
client.request(req)
}))
}
});
let server = Server::bind(&addr).serve(service);
println!("Listening on http://{}", addr);
server.await?;
Ok(())
}
3.4、Client
3.4.1、简单示例
js
use anyhow::*;
use hyper::Client;
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse()?;
let resp = client.get(uri).await?;
println!("Response: {}--{:?}", resp.status(), resp.body());
Ok(())
}
3.4.2、打印body
js
use anyhow::Result;
use hyper::{Client, body::HttpBody};
use tokio::io::{stdout, AsyncWriteExt as _};
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse()?;
let mut resp = client.get(uri).await?;
println!("{}", resp.status());
while let Some(chunk) = resp.body_mut().data().await {
// println!("{:?}", chunk?);
stdout().write_all(&chunk?).await?;
}
Ok(())
}
3.4.3、post
js
use anyhow::Result;
use hyper::{Client, Request, Body, Method};
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new();
let req = Request::builder()
.method(Method::POST)
.uri("http://httpbin.org/post")
.header("content-type", "application/json")
.body(Body::from(r#"{"key":"value"}"#))?;
let resp = client.request(req).await?;
// body -> Bytes
let res = hyper::body::to_bytes(resp.into_body()).await?;
// bytes -> String
println!("body: {:?}", String::from_utf8_lossy(&res));
Ok(())
}
3.4.4、body to json
js
use anyhow::Result;
use hyper::{Client, body::Buf};
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct User {
id: i32,
name: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new();
let uri = "http://jsonplaceholder.typicode.com/users".parse()?;
let res = client.get(uri).await?;
let body = hyper::body::aggregate(res).await?;
let users: Vec<User> = serde_json::from_reader(body.reader())?;
for user in users {
println!("id: {}, name: {}", user.id, user.name);
}
Ok(())
}
4、anyhow(异常捕捉)
4.1、安装
js
[dependencies]
anyhow = "1.0.75"
4.2、使用
4.2.1、Result
js
use std::fs::File;
use anyhow::Result;
fn main() -> Result<()> {
File::open("Cargo1.toml")?;
Ok(())
}
4.2.2、Context
js
use anyhow::{Result, Context};
use hyper::Client;
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::new();
let uri = "http1://httpbin.org/ip".parse()?;
let resp = client.get(uri).await.context("context")?;
println!("Response: {}--{:?}", resp.status(), resp.body());
Ok(())
}
4.2.3、with_context
js
use std::fs;
use anyhow::{Result, Context};
fn main() -> Result<()> {
let path = "Cargo1.toml";
let content = fs::read(path)
.with_context(|| format!("Failed to read instrs from {}", path))?;
println!("{:?}", content);
Ok(())
}
5、reqwest(请求库)
5.1、安装
js
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
5.2、get请求
js
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let resp = reqwest::get("https://httpbin.org/ip")
.await?
.json::<HashMap<String, String>>()
.await?;
println!("{:#?}", resp);
Ok(())
}
5.3、post请求
Cargo.toml
js
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
js
use serde::{Serialize, Deserialize};
use serde_json::json;
#[derive(Debug, Serialize, Deserialize)]
struct User {
userId: u16,
id: u16,
title: String,
body: String
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let post = client
.post("http://jsonplaceholder.typicode.com/posts")
.json(&json!({
"userId": 1,
"title": "foo",
"body": "bar"
}))
.send()
.await?;
let resp = post.json::<User>().await?;
println!("{:#?}", resp);
Ok(())
}
6、structopt(命令行参数转结构体)
6.1、安装
js
[dependencies]
structopt = "0.3"
6.2、使用
js
use std::path::PathBuf;
use structopt::StructOpt;
/// A basic example
#[derive(StructOpt, Debug)]
#[structopt(name = "basic")] // 命令行的名字
struct Opt {
/// Activate debug mode
#[structopt(short, long)]
debug: bool,
/// Set speed
// we don't want to name it "speed", need to look smart
#[structopt(short = "s", long = "speed", default_value = "42")]
speed: f64,
/// Input file
#[structopt(parse(from_os_str))]
input: PathBuf,
/// Output file, stdout if not present
#[structopt(parse(from_os_str))]
output: Option<PathBuf>,
/// Output file, stdout if not present
#[structopt(name = "file", parse(from_os_str))]
output2: Option<PathBuf>,
/// Where to write the output: to `stdout` or `file`
#[structopt(short)]
out_type: String,
}
fn main() {
let opt = Opt::from_args();
println!("{:#?}", opt);
}
7、egui(异步库)
7.1、安装
js
[dependencies]
eframe = "0.23"
egui_extras = { version = "0.23", features = ["all_loaders"] }
image = { version = "0.24", features = ["jpeg", "png"] }
7.2、使用
7.2.1、基础控件
js
use eframe::egui;
#[derive(Default)]
struct MyApp {
name: String,
age: u32,
}
impl eframe::App for MyApp {
fn update(&mut self, ctx: &egui::Context, frame: &mut eframe::Frame) {
egui::CentralPanel::default().show(ctx, |ui| {
ui.heading("My egui Application");
ui.horizontal(|ui| {
let name_label = ui.label("Your name: ");
ui.text_edit_singleline(&mut self.name)
.labelled_by(name_label.id);
});
ui.add(egui::Slider::new(&mut self.age, 0..=120).text("age"));
if ui.button("Click each year").clicked() {
self.age += 1;
}
ui.label(format!("Hello '{}', age {}", self.name, self.age));
ui.image(egui::include_image!(
"../assets/logo.png"
));
});
}
}
fn main() -> Result<(), eframe::Error>{
let options = eframe::NativeOptions {
initial_window_size: Some(egui::vec2(640.0, 600.0)),
..Default::default()
};
eframe::run_native(
"My egui App",
options,
Box::new(|cc| {
egui_extras::install_image_loaders(&cc.egui_ctx);
Box::new(MyApp::default())
}),
)
}
7.2.2、confirm确认框
js
use eframe::egui;
#[derive(Default)]
struct MyApp {
allowed_to_close: bool,
show_confirmation_dialog: bool,
}
impl eframe::App for MyApp {
fn on_close_event(&mut self) -> bool {
self.show_confirmation_dialog = true;
self.allowed_to_close
}
fn update(&mut self, ctx: &egui::Context, frame: &mut eframe::Frame) {
egui::CentralPanel::default().show(ctx, |ui| {
ui.heading("My egui Application");
});
if self.show_confirmation_dialog {
egui::Window::new("Do you want to quit?")
.collapsible(false)
.resizable(false)
.show(ctx, |ui| {
ui.horizontal(|ui| {
if ui.button("Cancel").clicked() {
self.show_confirmation_dialog = false;
}
if ui.button("Yes!").clicked() {
self.allowed_to_close = true;
frame.close();
}
});
});
}
}
}
fn main() -> Result<(), eframe::Error>{
let options = eframe::NativeOptions {
initial_window_size: Some(egui::vec2(640.0, 600.0)),
..Default::default()
};
eframe::run_native(
"My egui App",
options,
Box::new(|cc| {
Box::new(MyApp::default())
}),
)
}
7.2.3、键盘事件
js
impl eframe::App for Content {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
egui::CentralPanel::default().show(ctx, |ui| {
ui.heading("Press/Hold/Release example. Press A to test.");
if ui.button("Clear").clicked() {
self.text.clear();
}
egui::ScrollArea::vertical()
.auto_shrink([false; 2])
.stick_to_bottom(true)
.show(ui, |ui| {
ui.label(&self.text);
});
if ctx.input(|i| i.key_pressed(egui::Key::A)) {
self.text.push_str("\nPressed");
}
if ctx.input(|i| i.key_down(egui::Key::A)) {
self.text.push_str("\nHeld");
ui.ctx().request_repaint(); // make sure we note the holding.
}
if ctx.input(|i| i.key_released(egui::Key::A)) {
self.text.push_str("\nReleased");
}
});
}
}
8、drive_builder
8.1、安装
js
[dependencies]
drive_builder = "0.1"
8.2、使用
8.2.1、基本使用
js
let mut ch = ChannelBuilder::default();
ch.ipsum("world");
let ch = ch.build().unwrap();
println!("{:?}", ch);
8.2.2、setter
js
use derive_builder::Builder;
#[derive(Builder, Debug)]
struct Channel {
#[builder(setter(into, strip_option))]
ipsum: Option<String>,
#[builder(setter(into, strip_option), default)]
foo: Option<String>,
}
fn main() {
let ch = ChannelBuilder::default()
.ipsum("hello")
.build()
.unwrap();
println!("{:?}", ch);
}
8.2.3、重命名builder
js
use derive_builder::Builder;
#[derive(Builder, Debug)]
#[builder(name = "FooConstructor")]
struct Channel {
#[builder(setter(into, strip_option))]
ipsum: Option<String>,
#[builder(setter(into, strip_option), default)]
foo: Option<String>,
#[builder(setter(skip))] // 跳过该属性,会使用默认值
setter_skipped: String,
}
fn main() {
let mut ch = FooConstructor::default();
ch.ipsum("world");
// ch.setter_skipped("heee".to_string());
let ch = ch.build().unwrap();
println!("{:?}", ch);
}