Axum 使用 tracing-loki 实现 HTTP 请求日志的统一管理

日志的统一收集和管理是运维监控的重要环节。本文将详细介绍如何使用 Rust 的 tracing-lokitracing 以及 tower-http ,在 Cargo.toml 中开启相应的feature并导入(我就不筛选复制了)来构建一个完整的 HTTP 日志记录系统,实现日志的本地输出和远程 Loki 服务器同步发送。

系统架构概览

我们的日志系统包含以下几个核心组件:

  • tracing-subscriber: 日志订阅和分发的核心
  • tracing-loki: 将日志发送到 Grafana Loki 的适配器
  • tower-http TraceLayer: HTTP 请求的中间件追踪
  • 多层过滤器: 不同输出目标的独立日志级别控制

第一步:配置 tracing-loki 日志传输

环境变量配置

首先,我们需要配置连接 Loki 服务器的相关参数:

export LOKI_KEY="my-key"
export LOKI_VALUE="my-axum"
export LOKI_ORG="my-organization"
export LOKI_URL="http://localhost:3100"

初始化日志系统

pub fn init_logging() -> Result<(), Box<dyn std::error::Error>> {
    dotenvy::dotenv().ok(); 
    // 从环境变量读取 Loki 配置
    let loki_key = env::var("LOKI_KEY").expect("LOKI_KEY not set");
    let loki_val = env::var("LOKI_VALUE").expect("LOKI_VALUE not set");
    let loki_org = env::var("LOKI_ORG").expect("LOKI_ORG not set");
    let loki_url = env::var("LOKI_URL").expect("LOKI_URL not set");
    
    // 构建 Loki 层
    let (loki_layer, task) = tracing_loki::builder()
        .label(loki_key, loki_val)?  // 为所有日志添加服务标签
        .http_header("X-Scope-OrgID", loki_org)?  // 默认为 LOKI_ORG = "tenant1"
        .extra_field("pid", format!("{}", process::id()))?  // 添加进程ID
        .build_url(url::Url::parse(&loki_url).unwrap())?;
    
    // 配置不同输出的过滤器
    let loki_filter = EnvFilter::new("info");  // Loki 只接收 INFO 及以上级别
    let stdout_filter = EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| EnvFilter::new("info,html5ever=off"));  // 过滤掉 html5ever 的噪音日志
    
    // 标准输出层 - JSON 格式便于后续处理
    let stdout_layer = tracing_subscriber::fmt::layer().json();
    
    // 组装所有层
    tracing_subscriber::registry()
        .with(loki_layer.with_filter(loki_filter))      // Loki 输出
        .with(stdout_layer.with_filter(stdout_filter))  // 控制台输出
        .init();
    
    // 启动后台任务处理 Loki 日志发送
    tokio::spawn(task);
    
    Ok(())
}

关键特性说明

  1. 独立过滤器: 每个输出层都有独立的过滤器,可以为本地调试和远程监控设置不同的日志级别
  2. 自动标签: 通过 .label() 为所有发送到 Loki 的日志自动添加服务标识
  3. 额外字段: .extra_field() 可以添加进程 ID、主机名等上下文信息
  4. 异步发送: 后台任务确保日志发送不会阻塞主业务逻辑

第二步:配置 HTTP 请求追踪

使用 tower-httpTraceLayer 可以自动为每个 HTTP 请求创建详细的追踪信息:


pub fn http_tracer() -> TraceLayer<SharedClassifier<ServerErrorsAsFailures>> {
    let classifier = SharedClassifier::new(ServerErrorsAsFailures::new());
    
    TraceLayer::new(classifier)
        // ═══ 为每个请求创建详细的 span ═══
        .make_span_with(|req: &Request<axum::body::Body>| {
            // 解析客户端真实 IP
            let peer = req
                .extensions()
                .get::<ConnectInfo<SocketAddr>>()
                .map(|c| c.0)
                .unwrap();
            let ip = resolve_client_ip(peer, &req.headers()); //你的真实IP提取
            let client_ip = ip.to_string();
            
            // 提取 User-Agent
            let ua = req
                .headers()
                .get(header::USER_AGENT)
                .and_then(|h| h.to_str().ok())
                .unwrap_or("");
            
            // 创建包含丰富信息的 span
            info_span!(
                "http",
                // ─── 请求信息 ───
                client_ip,
                method = %req.method(),
                host   = ?req.headers().get(header::HOST),
                path   = %req.uri().path(),
                query  = %req.uri().query().unwrap_or(""),
                ua,
                // ─── 响应信息 (稍后填充) ───
                status = tracing::field::Empty,
                latency_us = tracing::field::Empty,
            )
        })
        // ═══ 请求开始时记录 (DEBUG 级别) ═══
        .on_request(DefaultOnRequest::new().level(tracing::Level::DEBUG))
        // ═══ 响应完成时记录 (INFO 级别,包含延迟) ═══
        .on_response(
            DefaultOnResponse::new()
                .level(tracing::Level::INFO)
                .latency_unit(tower_http::LatencyUnit::Micros)
                .include_headers(true),  // 包含所有响应头
        )
        // ═══ 5xx 错误记录为 ERROR 级别 ═══
        .on_failure(|fail: tower_http::classify::ServerErrorsFailureClass, duration: Duration, span: &Span| {
            span.record("latency_us", &(duration.as_micros() as u64));
            tracing::error!(?fail, "request failed");
        })
}

第三步:集成到 Axum 应用

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化日志系统
    init_logging()?;
    
    // 创建路由
    let app = Router::new()
        .route("/", get(handler)) 
        .layer(
            ServiceBuilder::new()
                .layer(http_tracer())  // 添加 HTTP 追踪中间件
                .into_inner()
        );
    
    // 启动服务器
    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
    tracing::info!("Server starting on {}", addr);
    
    axum::Server::bind(&addr)
        .serve(app.into_make_service_with_connect_info::<SocketAddr>())
        .await?;
    
    Ok(())
}

无配置的日志发送机制

自动日志传输的工作原理

一旦完成上述配置,你的应用中任何使用 tracing 宏的日志都会自动发送到 Loki 服务器,无需额外配置:

// 这些日志会自动发送到 Loki
tracing::info!("用户登录成功", user_id = 12345);
tracing::warn!("数据库连接延迟较高", latency_ms = 850);

// 结构化日志也完全支持
tracing::info!(
    user_id = 67890,
    action = "file_upload",
    file_size = 1024000,
    "文件上传完成"
);

您可能感兴趣的文章

发现更多精彩内容