Axum 使用 tracing-loki 实现 HTTP 请求日志的统一管理
日志的统一收集和管理是运维监控的重要环节。本文将详细介绍如何使用 Rust 的 tracing-loki
、tracing
以及 tower-http
,在 Cargo.toml 中开启相应的feature并导入(我就不筛选复制了)来构建一个完整的 HTTP 日志记录系统,实现日志的本地输出和远程 Loki 服务器同步发送。
系统架构概览
我们的日志系统包含以下几个核心组件:
- tracing-subscriber: 日志订阅和分发的核心
- tracing-loki: 将日志发送到 Grafana Loki 的适配器
- tower-http TraceLayer: HTTP 请求的中间件追踪
- 多层过滤器: 不同输出目标的独立日志级别控制
第一步:配置 tracing-loki 日志传输
环境变量配置
首先,我们需要配置连接 Loki 服务器的相关参数:
export LOKI_KEY="my-key"
export LOKI_VALUE="my-axum"
export LOKI_ORG="my-organization"
export LOKI_URL="http://localhost:3100"
初始化日志系统
pub fn init_logging() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
let loki_key = env::var("LOKI_KEY").expect("LOKI_KEY not set");
let loki_val = env::var("LOKI_VALUE").expect("LOKI_VALUE not set");
let loki_org = env::var("LOKI_ORG").expect("LOKI_ORG not set");
let loki_url = env::var("LOKI_URL").expect("LOKI_URL not set");
let (loki_layer, task) = tracing_loki::builder()
.label(loki_key, loki_val)? .http_header("X-Scope-OrgID", loki_org)? .extra_field("pid", format!("{}", process::id()))? .build_url(url::Url::parse(&loki_url).unwrap())?;
let loki_filter = EnvFilter::new("info"); let stdout_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,html5ever=off"));
let stdout_layer = tracing_subscriber::fmt::layer().json();
tracing_subscriber::registry()
.with(loki_layer.with_filter(loki_filter)) .with(stdout_layer.with_filter(stdout_filter)) .init();
tokio::spawn(task);
Ok(())
}
关键特性说明
- 独立过滤器: 每个输出层都有独立的过滤器,可以为本地调试和远程监控设置不同的日志级别
- 自动标签: 通过
.label()
为所有发送到 Loki 的日志自动添加服务标识 - 额外字段:
.extra_field()
可以添加进程 ID、主机名等上下文信息 - 异步发送: 后台任务确保日志发送不会阻塞主业务逻辑
第二步:配置 HTTP 请求追踪
使用 tower-http
的 TraceLayer
可以自动为每个 HTTP 请求创建详细的追踪信息:
pub fn http_tracer() -> TraceLayer<SharedClassifier<ServerErrorsAsFailures>> {
let classifier = SharedClassifier::new(ServerErrorsAsFailures::new());
TraceLayer::new(classifier)
.make_span_with(|req: &Request<axum::body::Body>| {
let peer = req
.extensions()
.get::<ConnectInfo<SocketAddr>>()
.map(|c| c.0)
.unwrap();
let ip = resolve_client_ip(peer, &req.headers()); let client_ip = ip.to_string();
let ua = req
.headers()
.get(header::USER_AGENT)
.and_then(|h| h.to_str().ok())
.unwrap_or("");
info_span!(
"http",
client_ip,
method = %req.method(),
host = ?req.headers().get(header::HOST),
path = %req.uri().path(),
query = %req.uri().query().unwrap_or(""),
ua,
status = tracing::field::Empty,
latency_us = tracing::field::Empty,
)
})
.on_request(DefaultOnRequest::new().level(tracing::Level::DEBUG))
.on_response(
DefaultOnResponse::new()
.level(tracing::Level::INFO)
.latency_unit(tower_http::LatencyUnit::Micros)
.include_headers(true), )
.on_failure(|fail: tower_http::classify::ServerErrorsFailureClass, duration: Duration, span: &Span| {
span.record("latency_us", &(duration.as_micros() as u64));
tracing::error!(?fail, "request failed");
})
}
第三步:集成到 Axum 应用
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logging()?;
let app = Router::new()
.route("/", get(handler))
.layer(
ServiceBuilder::new()
.layer(http_tracer()) .into_inner()
);
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
tracing::info!("Server starting on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await?;
Ok(())
}
无配置的日志发送机制
自动日志传输的工作原理
一旦完成上述配置,你的应用中任何使用 tracing
宏的日志都会自动发送到 Loki 服务器,无需额外配置:
tracing::info!("用户登录成功", user_id = 12345);
tracing::warn!("数据库连接延迟较高", latency_ms = 850);
tracing::info!(
user_id = 67890,
action = "file_upload",
file_size = 1024000,
"文件上传完成"
);