使用 Rust 实现 Web 服务器
在开始实现 web 服务器之前,首先要搞懂什么是同步,异步模型。
生活中简单的同步,异步模型
假如你是个饭店老板,新店开业,你招聘厨师按照菜谱制作番茄鸡蛋面。
制作顺序如下:
- 准备阶段 (30s)
- 烧水煮面 (5min)
- 炒番茄鸡蛋做浇头(30s)
- 浇给
小店开业,只招收一名厨师,这名厨师严格按照菜谱顺序制作。总计花费时间 6min 左右。好的,这个菜品爆了,越来越多的人来店里吃它。计算一下一天的营业额,总共卖出 240 份。 聪明的你,不慌,看我大力出奇迹,多招了 3 名厨师,同时开干,一天总共可以卖出 960 份了,但是你却没有挣钱,因为挣得钱全用来招聘厨师上了。 你开始暴露资本家的嘴脸,开除了那 3 名厨师,开始监督厨师,不让厨师偷懒,在有多个锅同时在煮面的情况下,挑战极限,估算厨师出制作菜品的时间,准备阶段 (30s)+炒番茄鸡蛋做浇头(30s)+其他事项(60s)= 2min,一天的营业额 720 份。差不多是之前效率的 3 倍左右,所以资本家的良心不会痛吗。
通过现实生活的例子,可以在计算机的世界中找到应用场景,它就是 Web 服务器。Web 服务器中经常出现 I/O 等待,例如数据库查询等待,文件传输等待,当用户请求服务器时,如果因为 I/O 等待时间过长,而处理不了其他用户的请求,这个时候 CPU 资源利用率大大降低。以下源码可以在 rust-concurrency 项目中 examples 中找到。
单线程版
使用单线程来处理用户请求。 例如餐馆那个例子,厨师同一时间,只处理一个用户请求。
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
// 监听地址:127.0.0.1:8787
let listener = TcpListener::bind("127.0.0.1:8787").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
println!("Connection established!");
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader
.lines()
.next()
.unwrap_or_else(|| Ok(String::from("")))
.unwrap();
let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
("HTTP/1.1 200 OK", "hello.html")
} else if request_line == "GET /sleep HTTP/1.1" {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
多线程版
使用线程池,4 个线程分别处理用户请求。 同时可以处理 4 个用户请求了,但是如果 4 个请求同时处理/sleep,线程池没有其他空余线程可用,导致用户请求等待。
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
use rust_concurrency::model::ThreadPool;
fn main() {
// 监听地址:127.0.0.1:8787
let listener = TcpListener::bind("127.0.0.1:8787").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
pool.execute(|| handle_connection(stream));
}
Err(e) => {
println!("Error: {}", e);
}
}
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader
.lines()
.next()
.unwrap_or_else(|| Ok(String::from("")))
.unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
异步版
使用 tokio 异步运行时,处理多用户请求。 io 出现等待时,挂起,线程去处理新的用户请求,服务器吞吐辆得到增强。
use std::fs;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 监听地址:127.0.0.1:8787
let listener = TcpListener::bind("127.0.0.1:8787").await?;
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
handle_connection(stream).await;
});
}
}
async fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader
.lines()
.next_line()
.await
.unwrap()
.unwrap_or_else(|| String::from(""));
println!("request_line = {}", request_line);
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
tokio::time::sleep(Duration::from_secs(10)).await;
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
let _ = stream.write_all(response.as_bytes()).await;
}
Web 服务器性能比较
本文档总结了三种不同 Rust 实现的 Web 服务器在两种不同场景下(CPU 密集型低延迟任务和 I/O 密集型高延迟任务)的基准测试结果。
测试使用 wrk
工具,命令结构如下: wrk -t4 -c100 -d10s <URL>
测试 1:CPU 密集型场景(提供小型本地文件)
本次测试通过请求一个小型 HTML 文件 (/
) 来对服务器进行基准测试。这模拟了低延迟、CPU 密集型的工作负载,其中服务器处理原始请求的速度是主要因素。
结果
服务器模型 | 请求数/秒 (RPS) | 平均延迟 | 备注 |
---|---|---|---|
单线程 | ~1600 | 6.33ms | 大量读取错误 |
多线程 | ~1606 | 2.63ms | 大量读取错误 |
异步 | ~1617 | 5.66ms | 大量读取错误 |
可视化 (RPS)
单线程:■■■■■■■■■■■■■■■■ (1600)
多线程:■■■■■■■■■■■■■■■■ (1606)
异步:■■■■■■■■■■■■■■■■ (1617)
分析
在这种场景下,所有三种模型在原始请求吞吐量方面表现相似。大量的 socket 读取错误表明,在这种高强度、低延迟的负载下,服务器难以跟上 wrk
激进的连接速率。由于此任务不涉及等待 I/O,因此本次测试未能突出各模型之间的关键差异。
测试 2:I/O 密集型场景(模拟慢速操作)
此测试旨在揭示不同并发模型的真正能力。我们对 /sleep
端点进行了基准测试,该端点强制服务器在响应前等待固定的时间。这模拟了真实的 I/O 操作,例如数据库查询或外部 API 调用。
结果
服务器模型 | 请求数/秒 (RPS) | 总请求数 (10 秒内) |
---|---|---|
单线程 | 0.20 | 2 |
多线程 | 0.79 | 8 |
异步 | 9.92 | 100 |
可视化(完成的总请求数)
单线程:■ (2)
多线程:■■■■ (8)
异步:■■■■■■■■■■■■■■■■■■■■■■■■■ (100)
分析
这里的结果截然不同,清晰地说明了每种方法的优缺点:
- 单线程: 它一次只能处理一个请求。当它忙于等待时,所有其他连接都超时了。在 10 秒的窗口内,它只能按顺序处理两个请求。
- 多线程: 拥有 4 个线程的线程池,它可以并发处理 4 个请求。5 秒后,这 4 个线程变为可用并处理接下来的 4 个请求,成功处理了总共 8 个请求。其容量严格受线程数量限制。
- 异步: 异步服务器在此处表现出色。当一个任务等待休眠时,它会交出控制权,允许底层线程立即开始处理其他请求。它成功地为所有 100 个并发连接启动了“等待”而没有阻塞,因此当它们的计时器到期时,完成了所有 100 个请求。这表明它在处理高并发 I/O 密集型工作负载方面具有卓越的能力,且资源消耗最少。
结论
虽然所有模型在简单、快速的 CPU 密集型任务中表现相似,但对于涉及 I/O 密集型操作的应用程序来说,异步模型是远远优越的,这也是 Web 服务最常见的场景。它提供了最高的吞吐量和最有效的资源利用。