翻译自:Create an async CRUD web service in Rust with warp
转载请注明出处:http://www.telihai.com/archives/9161/
在此博客的上一篇文章中,我们介绍了如何使用 Actix 和 Diesel 创建 Rust Web 服务。这次,我们将使用 warp Web 框架和 tokio-postgres 创建一个轻量级的,完全异步的 Web 服务。
Warp 基于著名并久经考验的 hyper HTTP 库,它提供了强大而快速的基础。warp 的另一个很酷的功能是它的过滤器系统,这是一种构成 Web 应用程序的功能方法。
本质上,过滤器只是可以组合在一起的功能。在扭曲中,它们用于从路由,中间件到将值传递给处理程序的所有过程。请参阅 warp 的发布帖子,以进行更深入的了解。在本教程中,您将看到一些实际使用的过滤器,并且我们将演示如何编写自己的过滤器。
接下来,您所需要的只是一个相当新的 Rust 安装(1.39+)和一种运行 Postgres 数据库的方法(例如 Docker)。
首先,创建您的测试项目。
cargo new warp-example
cd warp-example
接下来,编辑 Cargo.toml 文件并添加所需的依赖项。
[dependencies]
tokio = { version = "0.2", features = ["macros"] }
warp = "0.2"
mobc-postgres = { version = "0.5", features = ["with-chrono-0_4"] }
mobc = "0.5"
serde = {version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0"
thiserror = "1.0"
chrono = { version = "0.4", features = ["serde"] }
如果您想知道所有这些意味着什么:
tokio
是我们的异步运行时,我们需要执行期货warp
是我们的网络框架mobc / mobc-postgres
代表数据库连接的异步连接池serde
用于序列化和反序列化对象(例如 to/from JSON)thiserror
是我们将用于错误处理的实用程序库chrono
代表时间和日期实用程序为了避免将所有内容都转储到一个文件中,让我们在中添加一些结构 main.rs
。
mod data;
mod db;
mod error;
mod handler;
对于每个模块,我们还将创建一个文件(例如 data.rs
)。
第一步,创建一个运行在端口 8000 上的 Web 服务器,其 /health
端点返回一个 200 OK
。
在 main.rs
中添加:
#[tokio::main]
async fn main() {
let health_route = warp::path!("health")
.map(|| StatusCode::OK);
let routes = health_route
.with(warp::cors().allow_any_origin());
warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}
在上面的代码段中,我们定义了匹配 GET /health
并返回 200 OK
的 health_route
。然后,为演示如何添加中间件,使用 warp::cors
过滤器设置此路由,以允许从任何来源调用该服务。最后,用 warp::serve
运行服务。
使用 cargo run
和 cURL
启动并测试应用程序。
curl http://localhost:8000/health
到目前为止,一切都很好!下一步是设置您的 Postgres 数据库,并在 /health
handler 中添加对数据库连接的检查。
要启动 Postgres 数据库,可以使用 Docker 或本地安装 Postgres。使用 Docker,您可以简单地执行:
docker run -p 7878:5432 -d postgres:9.6.12
此命令在 7878 端口用 postgres
用户名启动一个 Postgres 数据库 postgres
,无密码。
现在您有了一个正在运行的数据库,下一步是从 warp 应用程序与该数据库进行连接。为此,您可以使用异步连接池 mobc 产生多个数据库连接,并在请求之间重用它们。
进行设置仅需几行。首先,在中定义一些便利类型 main.rs
。
use mobc::{Connection, Pool};
use mobc_postgres::{tokio_postgres, PgConnectionManager};
use tokio_postgres::NoTls;
type DBCon = Connection<PgConnectionManager<NoTls>>;
type DBPool = Pool<PgConnectionManager<NoTls>>;
下一步,在 db.rs
中创建连接池。
use crate::{DBCon, DBPool};
use mobc_postgres::{tokio_postgres, PgConnectionManager};
use tokio_postgres::{Config, Error, NoTls};
use std::fs;
use std::str::FromStr;
use std::time::Duration;
const DB_POOL_MAX_OPEN: u64 = 32;
const DB_POOL_MAX_IDLE: u64 = 8;
const DB_POOL_TIMEOUT_SECONDS: u64 = 15;
pub fn create_pool() -> std::result::Result<DBPool, mobc::Error<Error>> {
let config =
Config::from_str("postgres://postgres@127.0.0.1:7878/postgres")?;
let manager = PgConnectionManager::new(config, NoTls);
Ok(Pool::builder()
.max_open(DB_POOL_MAX_OPEN)
.max_idle(DB_POOL_MAX_IDLE)
.get_timeout(Some(Duration::from_secs(DB_POOL_TIMEOUT_SECONDS)))
.build(manager))
}
该 create_pool
函数仅创建一个 Postgres 连接字符串,并为连接池定义一些参数,例如最小和最大打开连接以及连接超时。
下一步是简单地构建池并返回它。此时,实际上没有创建任何数据库连接,仅创建了池。
由于我们已经在这里,所以我们还创建一个用于在启动时初始化数据库的函数。
const INIT_SQL: &str = "./db.sql";
pub async fn get_db_con(db_pool: &DBPool) -> Result<DBCon> {
db_pool.get().await.map_err(DBPoolError)
}
pub async fn init_db(db_pool: &DBPool) -> Result<()> {
let init_file = fs::read_to_string(INIT_SQL)?;
let con = get_db_con(db_pool).await?;
con.batch_execute(init_file.as_str()).await.map_err(DBInitError)?;
Ok(())
}
使用该 get_db_con
utility,我们尝试从池中获取新的数据库连接。现在不用担心错误——我们稍后将讨论错误处理。
要在启动时从 db.sql
文件创建数据库表,要调用 init_db
函数。这会将文件读入字符串并执行 query。
初始化查询如下所示:
CREATE TABLE IF NOT EXISTS todo
(
id SERIAL PRIMARY KEY NOT NULL,
name VARCHAR(255),
created_at timestamp with time zone DEFAULT (now() at time zone 'utc'),
checked boolean DEFAULT false
);
回到我们的主函数,现在我们可以调用数据库设置函数。
let db_pool = db::create_pool().expect("database pool can be created");
db::init_db(&db_pool).await.expect("database can be initialized");
如果任何数据库设置代码失败,我们可以抛出 panic
,因为继续下去没有意义。
假设它没有失败,那么现在是时候解决本节的主要目标:向 /health
handler 添加数据库检查。
为此,我们需要一种将传递 db_pool
给 handler 的方法。这是写我们的第一个 warp filter
的绝好机会。
在 main,rs
中,添加以下 with_db
过滤器。
use std::convert::Infallible;
use warp::{Filter, Rejection};
fn with_db(
db_pool: DBPool
) -> impl Filter<Extract = (DBPool,), Error = Infallible> + Clone {
warp::any().map(move || db_pool.clone())
}
这是一个简单的提取过滤器。上面的意思是,对于任何 route(any()
) ,您都希望提取一个 DBPool
并将其传递。
如果您有兴趣了解有关过滤器的更多信息,文档 会很有帮助。
然后使用 .and()
操作符将过滤器简单地添加到 handler 定义中:
let health_route = warp::path!("health")
.and(with_db(db_pool.clone()))
.and_then(handler::health_handler);
移动 health handler 到 handler.rs
文件并添加数据库检查。
use crate::{db, DBPool};
use warp::{http::StatusCode, reject, Reply, Rejection};
pub async fn health_handler(
db_pool: DBPool
) -> std::result::Result<impl Reply, Rejection> {
let db = db::get_db_con(&db_pool)
.await
.map_err(|e| reject::custom(e))?;
db.execute("SELECT 1", &[])
.await
.map_err(|e| reject::custom(DBQueryError(e)))?;
Ok(StatusCode::OK)
}
现在,handler 收到一个 DBPool
,您可以使用它来建立连接并对数据库发起健全性检查查询。
如果检查期间发生错误,使用 reject::custom
来返回自定义错误。
接下来,按照承诺,让我们看一下使用 warp 进行错误处理。
干净的错误处理是任何 Web 应用程序中最重要且经常被忽视的事情之一。目的是在不泄漏内部细节的情况下为 API 使用者提供有用的错误信息。
我们将使用 thiserror 库方便地创建自定义错误。
在 error.rs
中定义一个 Error
枚举,该枚举具有所有错误的变体。
use mobc_postgres::tokio_postgres;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("error getting connection from DB pool: {0}")]
DBPoolError(mobc::Error<tokio_postgres::Error>),
#[error("error executing DB query: {0}")]
DBQueryError(#[from] tokio_postgres::Error),
#[error("error creating table: {0}")]
DBInitError(tokio_postgres::Error),
#[error("error reading file: {0}")]
ReadFileError(#[from] std::io::Error),
}
如果我们可以找到将这些错误和其他错误转换为有意义的 API 响应的方法,则可以简单地从处理程序返回我们的自定义错误之一,而 caller 将自动获得正确的错误消息和状态代码。
为此,我们将使用 warp 的概念 rejections
。
首先,将便利类型添加到 main.rs
for fallible results。
type Result<T> = std::result::Result<T, warp::Rejection>;
接下来,通过实现 Reject
trait,确保通过 warp 将您的自定义错误识别为 rejections。
impl warp::reject::Reject for Error {}
定义拒绝处理程序,以以下形式将 rejections 转换为良好的错误响应。
#[derive(Serialize)]
struct ErrorResponse {
message: String,
}
这样的拒绝处理程序可能看起来像这样:
pub async fn handle_rejection(
err: Rejection
) -> std::result::Result<impl Reply, Infallible> {
let code;
let message;
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "Not Found";
} else if let Some(_) =
err.find::<warp::filters::body::BodyDeserializeError>()
{
code = StatusCode::BAD_REQUEST;
message = "Invalid Body";
} else if let Some(e) = err.find::<Error>() {
match e {
Error::DBQueryError(_) => {
code = StatusCode::BAD_REQUEST;
message = "Could not Execute request";
}
_ => {
eprintln!("unhandled application error: {:?}", err);
code = StatusCode::INTERNAL_SERVER_ERROR;
message = "Internal Server Error";
}
}
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
message = "Method Not Allowed";
} else {
eprintln!("unhandled error: {:?}", err);
code = StatusCode::INTERNAL_SERVER_ERROR;
message = "Internal Server Error";
}
let json = warp::reply::json(&ErrorResponse {
message: message.into(),
});
Ok(warp::reply::with_status(json, code))
}
基本上,我们从处理程序那里得到一个 Rejection
。然后,根据错误的类型,我们设置响应的消息和状态代码。
如您所见,我们既可以处理一般错误,例如 not found
,也可以处理特定问题,例如解析 request 的 JSON body 时遇到的错误。
后备处理程序将一般 500 错误返回给用户并记录错误原因,因此您可以在必要时进行调查而不会泄漏内部信息。
在路由定义中,只需使用 recover
filter 添加这个 error handler:
let routes = health_route
.with(warp::cors().allow_any_origin())
.recover(error::handle_rejection);
完美!我们已经取得了很大的进步。剩下的就是为待办事项应用程序实际实现 CRUD 处理程序。
现在,我们有一个运行并连接到数据库的 Web 服务器,以及一种优雅地处理错误的方法。我们的应用程序唯一缺少的是实际的应用程序逻辑。
我们将实现四个处理程序:
GET /todo/?search={searchString}
列出所有待办事项,并通过可选的搜索字符串过滤POST /todo/
创建一个待办事项PUT /todo/{id}
用给定的 ID 更新待办事项DELETE /todo/{id}
删除具有给定 ID 的待办事项第一步是创建待办事项,因为如果没有待办事项,我们将无法方便地测试其他端点。
在 db.rs
中,添加用于将待办事项插入数据库的功能。
const TABLE: &str = "todo";
pub async fn create_todo(db_pool: &DBPool, body: TodoRequest) -> Result<Todo> {
let con = get_db_con(db_pool).await?;
let query = format!("INSERT INTO {} (name) VALUES ($1) RETURNING *", TABLE);
let row = con
.query_one(query.as_str(), &[&body.name])
.await
.map_err(DBQueryError)?;
Ok(row_to_todo(&row))
}
fn row_to_todo(row: &Row) -> Todo {
let id: i32 = row.get(0);
let name: String = row.get(1);
let created_at: DateTime<Utc> = row.get(2);
let checked: bool = row.get(3);
Todo {
id,
name,
created_at,
checked,
}
}
这将从连接池确立一个连接,发送插入查询,并将返回的行转换为 Todo
。
为此,您需要一些在 data.rs
中定义一些数据对象。
use chrono::prelude::*;
use serde_derive::{Deserialize, Serialize};
#[derive(Deserialize)]
pub struct Todo {
pub id: i32,
pub name: String,
pub created_at: DateTime<Utc>,
pub checked: bool,
}
#[derive(Deserialize)]
pub struct TodoRequest {
pub name: String,
}
#[derive(Deserialize)]
pub struct TodoUpdateRequest {
pub name: String,
pub checked: bool,
}
#[derive(Serialize)]
pub struct TodoResponse {
pub id: i32,
pub name: String,
pub checked: bool,
}
impl TodoResponse {
pub fn of(todo: Todo) -> TodoResponse {
TodoResponse {
id: todo.id,
name: todo.name,
checked: todo.checked,
}
}
}
该 Todo
结构实质上是数据库表的镜像。tokio-postgres
可以使用 chrono 的 DateTime<Utc>
来映射转换时间戳。其他结构是您期望用于创建和更新待办事项的 JSON 请求,以及您在 list、update 和 create 处理程序中发送回的响应。
现在,您可以在 handler.rs
中创建实际的创建处理程序。
pub async fn create_todo_handler(
body: TodoRequest, db_pool: DBPool
) -> Result<impl Reply> {
Ok(json(&TodoResponse::of(
db::create_todo(&db_pool, body).await.map_err(|e| reject::custom(e))?,
)))
}
在这种情况下,您会同时得到传递给 handler 的一个从 request body 解析的 TodoRequest
和 db_pool
。到达那里后,只需调用数据库函数,将其映射到 TodoResponse
,然后使用 warp 的 reply::json
助手将其序列化为 JSON。
如果发生错误,请使用warp’s处理它,这使您可以根据我们的自定义错误类型创建拒绝项。reject::custom
唯一缺少的是 main.rs
中的路由定义。
let todo = warp::path("todo");
let todo_routes = todo
.and(warp::post())
.and(warp::body::json())
.and(with_db(db_pool.clone()))
.and_then(handler::create_todo_handler));
let routes = health_route
.or(todo_routes)
.with(warp::cors().allow_any_origin())
.recover(error::handle_rejection);
You’ll use warp::path
at /todo/
for several routes. 然后,使用 warp 的过滤器,组成您的创建处理程序。
添加 post
方法,明确说明您需要一个 JSON body,然后使用 with_db
过滤器表示需要数据库访问权限。最后,通过告诉路由使用哪个处理程序来完成该操作。
然后,所有这些都将与 or
操作符一起传递到路由。
使用以下命令对其进行测试。
curl -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"name": "Some Todo"}'
{"id":1,"name":"Some Todo","checked":false}
太棒了!现在您知道了它的工作原理,您可以一次完成其他三个处理程序。同样,首先添加数据库助手。
pub async fn fetch_todos(
db_pool: &DBPool, search: Option<String>
) -> Result<Vec<Todo>> {
let con = get_db_con(db_pool).await?;
let where_clause = match search {
Some(_) => "WHERE name like $1",
None => "",
};
let query = format!(
"SELECT {} FROM {} {} ORDER BY created_at DESC",
SELECT_FIELDS, TABLE, where_clause
);
let q = match search {
Some(v) => con.query(query.as_str(), &[&v]).await,
None => con.query(query.as_str(), &[]).await,
};
let rows = q.map_err(DBQueryError)?;
Ok(rows.iter().map(|r| row_to_todo(&r)).collect())
}
pub async fn update_todo(
db_pool: &DBPool,
id: i32,
body: TodoUpdateRequest
) -> Result<Todo> {
let con = get_db_con(db_pool).await?;
let query = format!(
"UPDATE {} SET name = $1, checked = $2 WHERE id = $3 RETURNING *",
TABLE
);
let row = con
.query_one(query.as_str(), &[&body.name, &body.checked, &id])
.await
.map_err(DBQueryError)?;
Ok(row_to_todo(&row))
}
pub async fn delete_todo(db_pool: &DBPool, id: i32) -> Result<u64> {
let con = get_db_con(db_pool).await?;
let query = format!("DELETE FROM {} WHERE id = $1", TABLE);
con.execute(query.as_str(), &[&id]).await.map_err(DBQueryError)
}
这些基本与 create
情况相同,不同之处在于 fetch_todos
,如果有搜索词,您将创建一个不同的查询。
接下来让我们看一下处理程序。
#[derive(Deserialize)]
pub struct SearchQuery {
search: Option<String>,
}
pub async fn list_todos_handler(
query: SearchQuery,
db_pool: DBPool
) -> Result<impl Reply> {
let todos = db::fetch_todos(&db_pool, query.search)
.await
.map_err(|e| reject::custom(e))?;
Ok(json::<Vec<_>>(
&todos.into_iter().map(|t| TodoResponse::of(t)).collect(),
))
}
pub async fn update_todo_handler(
id: i32,
body: TodoUpdateRequest,
db_pool: DBPool,
) -> Result<impl Reply> {
Ok(json(&TodoResponse::of(
db::update_todo(&db_pool, id, body)
.await
.map_err(|e| reject::custom(e))?,
)))
}
pub async fn delete_todo_handler(
id: i32,
db_pool: DBPool
) -> Result<impl Reply> {
db::delete_todo(&db_pool, id).await.map_err(|e| reject::custom(e))?;
Ok(StatusCode::OK)
}
同样,您会看到一些熟悉的东西。如果一切顺利,每个处理程序都会调用数据库层,处理错误,并为调用方创建一个返回值。
一个有趣的例外是 list_todos_handler
,前面提到的查询参数已传入,已经解析为 SearchQuery
。
这就是您处理 warp 中的查询参数的方式。如果您有更多具有不同类型的参数,则只需将它们添加到 SearchQuery
结构中,它们就会被自动解析。
让我们进行所有连接,然后进行一项最终测试。
let todo_routes = todo
.and(warp::get())
.and(warp::query())
.and(with_db(db_pool.clone()))
.and_then(handler::list_todos_handler)
.or(todo
.and(warp::post())
.and(warp::body::json())
.and(with_db(db_pool.clone()))
.and_then(handler::create_todo_handler))
.or(todo
.and(warp::put())
.and(warp::path::param())
.and(warp::body::json())
.and(with_db(db_pool.clone()))
.and_then(handler::update_todo_handler))
.or(todo
.and(warp::delete())
.and(warp::path::param())
.and(with_db(db_pool.clone()))
.and_then(handler::delete_todo_handler));
这里有一些新东西。要将查询参数获取到 list handler,您需要使用 warp::query()
。要获取 id
参数用于 update 和 delete,请使用 warp::path::param()
。使用 or
将不同的路由合并,即可设置 todo 路由。
在 warp 中创建和构造路由的方法有很多。它只是被组合在一起的函数,因此处理过程非常灵活。有关更多示例,请查看官方文档。
现在让我们测试整个事情。
首先,查看错误处理是否真正起作用。
curl -v -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"wrong": "Some Todo"}'
HTTP/1.1 400 Bad Request
{"message":"Invalid Body"}
接下来,添加另一个 Todo
,立即将其 check,然后尝试更新不存在的 Todo
。
curl -X POST 'http://localhost:8000/todo/' -H 'Content-Type: application/json' -d '{"name": "Done Todo"}'
{"id":2,"name":"Done Todo","checked":false}
curl -X PUT 'http://localhost:8000/todo/2' -H 'Content-Type: application/json' -d '{"name": "Done Todo", "checked": true}'
{"id":2,"name":"Done Todo","checked":true}
curl -X PUT 'http://localhost:8000/todo/2000' -H 'Content-Type: application/json' -d '{"name": "Done Todo", "checked": true}'
{"message":"Could not Execute request"}
到目前为止,一切都很好!现在列出它们,过滤列表,删除其中之一,然后再次列出。
curl -X GET 'http://localhost:8000/todo/' -H 'Content-Type: application/json'
[{"id":1,"name":"Some Todo","checked":false},{"id":2,"name":"Done Todo","checked":true}]
curl -X GET 'http://localhost:8000/todo/?search=Done%20Todo' -H 'Content-Type: application/json'
[{"id":2,"name":"Done Todo","checked":true}]
curl -v -X DELETE 'http://localhost:8000/todo/2' -H 'Content-Type: application/json'
HTTP/1.1 200 OK
curl -X GET 'http://localhost:8000/todo/' -H 'Content-Type: application/json'
[{"id":1,"name":"Some Todo","checked":false}]
完善!一切正常。您可以在 GitHub 上找到此示例的完整代码。
太好了!在本教程中,我们演示了如何使用 warp 和 tokio-postgres 创建完全异步的 Web 应用程序。我们设法在 300 行以下的 Rust 代码中对错误进行了错误处理,从而获得了基于数据库的基本 CRUD Web 服务。不是太糟糕!
warp 似乎很有前途;它轻巧,现代,快速,我喜欢功能方法。该框架还很年轻,还没有经受住时间的考验,但是到目前为止,它看起来很棒。