risingwave_rt/
logger.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::env;
17use std::path::PathBuf;
18
19use either::Either;
20use fastrace_opentelemetry::OpenTelemetryReporter;
21use opentelemetry::InstrumentationScope;
22use opentelemetry::trace::{SpanKind, TracerProvider};
23use opentelemetry_otlp::SpanExporter;
24use opentelemetry_sdk::Resource;
25use opentelemetry_sdk::trace::TracerProviderBuilder;
26use risingwave_common::metrics::MetricsLayer;
27use risingwave_common::util::deployment::Deployment;
28use risingwave_common::util::env_var::env_var_is_true;
29use risingwave_common::util::query_log::*;
30use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
31use thiserror_ext::AsReport;
32use tracing::level_filters::LevelFilter as Level;
33use tracing_subscriber::filter::Targets;
34use tracing_subscriber::fmt::FormatFields;
35use tracing_subscriber::fmt::format::DefaultFields;
36use tracing_subscriber::fmt::time::OffsetTime;
37use tracing_subscriber::layer::SubscriberExt;
38use tracing_subscriber::prelude::*;
39use tracing_subscriber::{EnvFilter, filter, reload};
40
41pub struct LoggerSettings {
42    /// The name of the service. Used to identify the service in distributed tracing.
43    name: String,
44    /// Enable tokio console output.
45    enable_tokio_console: bool,
46    /// Enable colorful output in console.
47    colorful: bool,
48    /// Output to `stderr` instead of `stdout`.
49    stderr: bool,
50    /// Whether to include thread name in the log.
51    with_thread_name: bool,
52    /// Override target settings.
53    targets: Vec<(String, tracing::metadata::LevelFilter)>,
54    /// Override the default level.
55    default_level: Option<tracing::metadata::LevelFilter>,
56    /// The endpoint of the tracing collector in OTLP gRPC protocol.
57    tracing_endpoint: Option<String>,
58}
59
60impl Default for LoggerSettings {
61    fn default() -> Self {
62        Self::new("risingwave")
63    }
64}
65
66impl LoggerSettings {
67    /// Create a new logger settings from the given command-line options.
68    ///
69    /// If env var `RW_TRACING_ENDPOINT` is not set, the meta address will be used
70    /// as the default tracing endpoint, which means that the embedded tracing
71    /// collector will be used.
72    pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
73        let mut settings = Self::new(O::name());
74        if settings.tracing_endpoint.is_none() // no explicit endpoint
75            && let Some(addr) = opts.meta_addr().exactly_one()
76        // meta address is valid
77        {
78            // Use embedded collector in the meta service.
79            // TODO: when there's multiple meta nodes for high availability, we may send
80            // to a wrong node here.
81            settings.tracing_endpoint = Some(addr.to_string());
82        }
83        settings
84    }
85
86    /// Create a new logger settings with the given service name.
87    pub fn new(name: impl Into<String>) -> Self {
88        Self {
89            name: name.into(),
90            enable_tokio_console: false,
91            colorful: console::colors_enabled_stderr() && console::colors_enabled(),
92            stderr: false,
93            with_thread_name: false,
94            targets: vec![],
95            default_level: None,
96            tracing_endpoint: std::env::var("RW_TRACING_ENDPOINT").ok(),
97        }
98    }
99
100    /// Enable tokio console output.
101    pub fn tokio_console(mut self, enabled: bool) -> Self {
102        self.enable_tokio_console = enabled;
103        self
104    }
105
106    /// Output to `stderr` instead of `stdout`.
107    pub fn stderr(mut self, enabled: bool) -> Self {
108        self.stderr = enabled;
109        self
110    }
111
112    /// Whether to include thread name in the log.
113    pub fn with_thread_name(mut self, enabled: bool) -> Self {
114        self.with_thread_name = enabled;
115        self
116    }
117
118    /// Overrides the default target settings.
119    pub fn with_target(
120        mut self,
121        target: impl Into<String>,
122        level: impl Into<tracing::metadata::LevelFilter>,
123    ) -> Self {
124        self.targets.push((target.into(), level.into()));
125        self
126    }
127
128    /// Overrides the default level.
129    pub fn with_default(mut self, level: impl Into<tracing::metadata::LevelFilter>) -> Self {
130        self.default_level = Some(level.into());
131        self
132    }
133
134    /// Overrides the tracing endpoint.
135    pub fn with_tracing_endpoint(mut self, endpoint: impl Into<String>) -> Self {
136        self.tracing_endpoint = Some(endpoint.into());
137        self
138    }
139}
140
141/// Create a filter that disables all events or spans.
142fn disabled_filter() -> filter::Targets {
143    filter::Targets::new()
144}
145
146/// Init logger for RisingWave binaries.
147///
148/// ## Environment variables to configure logger dynamically
149///
150/// ### `RUST_LOG`
151///
152/// Overrides default level and tracing targets of the fmt layer (formatting and
153/// logging to `stdout` or `stderr`).
154///
155/// Note that only verbosity levels below or equal to `DEBUG` are effective in
156/// release builds.
157///
158/// e.g.,
159/// ```bash
160/// RUST_LOG="info,risingwave_stream=debug,events=debug"
161/// ```
162///
163/// ### `RW_QUERY_LOG_PATH`
164///
165/// Configures the path to generate query log.
166///
167/// If it is set,
168/// - Dump logs of all SQLs, i.e., tracing target [`PGWIRE_QUERY_LOG`] to
169///   `RW_QUERY_LOG_PATH/query.log`.
170/// - Dump slow queries, i.e., tracing target [`PGWIRE_SLOW_QUERY_LOG`] to
171///   `RW_QUERY_LOG_PATH/slow_query.log`.
172///
173/// Note:
174/// To enable query log in the fmt layer (slow query is included by default), set
175/// ```bash
176/// RUST_LOG="pgwire_query_log=info"
177/// ```
178///
179/// `RW_QUERY_LOG_TRUNCATE_LEN` configures the max length of the SQLs logged in the query log,
180/// to avoid the log file growing too large. The default value is 1024 in production.
181///
182/// ### `ENABLE_PRETTY_LOG`
183///
184/// If it is set to `true`, enable pretty log output, which contains line numbers and prints spans in multiple lines.
185/// This can be helpful for development and debugging.
186///
187/// Hint: Also turn off other uninteresting logs to make the most of the pretty log.
188/// e.g.,
189/// ```bash
190/// RUST_LOG="risingwave_storage::hummock::event_handler=off,batch_execute=off,risingwave_batch::task=off" ENABLE_PRETTY_LOG=true risedev d
191/// ```
192pub fn init_risingwave_logger(settings: LoggerSettings) {
193    let deployment = Deployment::current();
194
195    // Default timer for logging with local time offset.
196    let default_timer = OffsetTime::local_rfc_3339().unwrap_or_else(|e| {
197        println!(
198            "failed to get local time offset, falling back to UTC: {}",
199            e.as_report()
200        );
201        OffsetTime::new(
202            time::UtcOffset::UTC,
203            time::format_description::well_known::Rfc3339,
204        )
205    });
206
207    // Default filter for logging to stdout and tracing.
208    let default_filter = {
209        let mut filter = filter::Targets::new();
210
211        // Configure levels for some RisingWave crates. Can still be overridden by `RUST_LOG`.
212        // Other RisingWave crates like `stream` and `storage` will follow the default level.
213        filter = filter
214            .with_target("auto_schema_change", Level::INFO)
215            .with_target("risingwave_sqlparser", Level::INFO)
216            .with_target("risingwave_connector_node", Level::INFO)
217            .with_target("pgwire", Level::INFO)
218            .with_target(PGWIRE_QUERY_LOG, Level::OFF)
219            // debug-purposed events are disabled unless `RUST_LOG` overrides
220            .with_target("events", Level::OFF);
221
222        // Configure levels for external crates. Can still be overridden by `RUST_LOG`.
223        // Other external crates will follow the default level.
224        filter = filter
225            .with_target("foyer", Level::INFO)
226            .with_target("aws", Level::INFO)
227            .with_target("aws_config", Level::WARN)
228            .with_target("aws_endpoint", Level::WARN)
229            .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)
230            .with_target("hyper", Level::WARN)
231            .with_target("h2", Level::WARN)
232            .with_target("tower", Level::WARN)
233            .with_target("tonic", Level::WARN)
234            .with_target("isahc", Level::WARN)
235            .with_target("console_subscriber", Level::WARN)
236            .with_target("reqwest", Level::WARN)
237            .with_target("sled", Level::INFO)
238            .with_target("cranelift", Level::INFO)
239            .with_target("wasmtime", Level::INFO)
240            .with_target("sqlx", Level::WARN)
241            .with_target("opendal", Level::INFO)
242            .with_target("reqsign", Level::INFO)
243            .with_target("jni", Level::INFO)
244            .with_target("async_nats", Level::WARN);
245
246        // For all other crates, apply default level depending on the deployment and `debug_assertions` flag.
247        let default_level = match deployment {
248            Deployment::Ci => Level::INFO,
249            _ => {
250                if cfg!(debug_assertions) {
251                    Level::DEBUG
252                } else {
253                    Level::INFO
254                }
255            }
256        };
257        filter = filter.with_default(default_level);
258
259        // Overrides from settings.
260        filter = filter.with_targets(settings.targets);
261        if let Some(default_level) = settings.default_level {
262            filter = filter.with_default(default_level);
263        }
264
265        // Overrides from env var.
266        if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV)
267            && !rust_log.is_empty()
268        {
269            let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
270            if let Some(default_level) = rust_log_targets.default_level() {
271                filter = filter.with_default(default_level);
272            }
273            filter = filter.with_targets(rust_log_targets)
274        };
275
276        filter
277    };
278
279    let mut layers = vec![];
280
281    // fmt layer (formatting and logging to `stdout` or `stderr`)
282    {
283        let fmt_layer = tracing_subscriber::fmt::layer()
284            .with_thread_names(settings.with_thread_name)
285            .with_timer(default_timer.clone())
286            .with_ansi(settings.colorful)
287            .with_writer(move || {
288                if settings.stderr {
289                    Either::Left(std::io::stderr())
290                } else {
291                    Either::Right(std::io::stdout())
292                }
293            });
294
295        let fmt_layer = match deployment {
296            Deployment::Ci => fmt_layer.compact().boxed(),
297            Deployment::Cloud => fmt_layer
298                .json()
299                .map_event_format(|e| e.with_current_span(false)) // avoid duplication as there's a span list field
300                .boxed(),
301            Deployment::Other => {
302                if env_var_is_true("ENABLE_PRETTY_LOG") {
303                    fmt_layer.pretty().boxed()
304                } else {
305                    fmt_layer.boxed()
306                }
307            }
308        };
309
310        layers.push(
311            fmt_layer
312                .with_filter(default_filter.clone().with_target("rw_tracing", Level::OFF)) // filter-out tracing-only events
313                .boxed(),
314        );
315    };
316
317    // If `RW_QUERY_LOG_PATH` env var is set to a directory, turn on query log files.
318    let query_log_path = std::env::var("RW_QUERY_LOG_PATH");
319    if let Ok(query_log_path) = query_log_path {
320        let query_log_path = PathBuf::from(query_log_path);
321        std::fs::create_dir_all(query_log_path.clone()).unwrap_or_else(|e| {
322            panic!(
323                "failed to create directory '{}' for query log: {}",
324                query_log_path.display(),
325                e.as_report(),
326            )
327        });
328
329        /// Newtype wrapper for `DefaultFields`.
330        ///
331        /// `fmt::Layer` will share the same `FormattedFields` extension for spans across
332        /// different layers, as long as the type of `N: FormatFields` is the same. This
333        /// will cause several problems:
334        ///
335        /// - `with_ansi(false)` does not take effect and it will follow the settings of
336        ///   the primary fmt layer installed above.
337        /// - `Span::record` will update the same `FormattedFields` multiple times,
338        ///   leading to duplicated fields.
339        ///
340        /// As a workaround, we use a newtype wrapper here to get a different type id.
341        /// The const generic parameter `SLOW` is further used to distinguish between the
342        /// query log and the slow query log.
343        #[derive(Default)]
344        struct FmtFields<const SLOW: bool>(DefaultFields);
345
346        impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields<SLOW> {
347            fn format_fields<R: tracing_subscriber::field::RecordFields>(
348                &self,
349                writer: tracing_subscriber::fmt::format::Writer<'writer>,
350                fields: R,
351            ) -> std::fmt::Result {
352                self.0.format_fields(writer, fields)
353            }
354        }
355
356        for (file_name, target, is_slow) in [
357            ("query.log", PGWIRE_QUERY_LOG, false),
358            ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true),
359        ] {
360            let path = query_log_path.join(file_name);
361
362            let file = std::fs::OpenOptions::new()
363                .create(true)
364                .write(true)
365                .truncate(true)
366                .open(&path)
367                .unwrap_or_else(|e| {
368                    panic!("failed to create `{}`: {}", path.display(), e.as_report(),)
369                });
370
371            let layer = tracing_subscriber::fmt::layer()
372                .with_ansi(false)
373                .with_level(false)
374                .with_file(false)
375                .with_target(false)
376                .with_timer(default_timer.clone())
377                .with_thread_names(true)
378                .with_thread_ids(true)
379                .with_writer(file);
380
381            let layer = match is_slow {
382                true => layer.fmt_fields(FmtFields::<true>::default()).boxed(),
383                false => layer.fmt_fields(FmtFields::<false>::default()).boxed(),
384            };
385
386            let layer = layer.with_filter(
387                filter::Targets::new()
388                    // Root span must be enabled to provide common info like the SQL query.
389                    .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO)
390                    .with_target(target, Level::INFO),
391            );
392
393            layers.push(layer.boxed());
394        }
395    }
396
397    if settings.enable_tokio_console {
398        let (console_layer, server) = console_subscriber::ConsoleLayer::builder()
399            .with_default_env()
400            .build();
401        let console_layer = console_layer.with_filter(
402            filter::Targets::new()
403                .with_target("tokio", Level::TRACE)
404                .with_target("runtime", Level::TRACE),
405        );
406        layers.push(console_layer.boxed());
407        std::thread::spawn(|| {
408            tokio::runtime::Builder::new_current_thread()
409                .enable_all()
410                .build()
411                .unwrap()
412                .block_on(async move {
413                    println!("serving console subscriber");
414                    server.serve().await.unwrap();
415                });
416        });
417    };
418
419    // Tracing layer
420    #[cfg(not(madsim))]
421    if let Some(endpoint) = settings.tracing_endpoint {
422        println!("opentelemetry tracing will be exported to `{endpoint}` if enabled");
423
424        use opentelemetry::KeyValue;
425        use opentelemetry_otlp::WithExportConfig;
426        use opentelemetry_semantic_conventions::resource;
427
428        let id = format!(
429            "{}-{}",
430            hostname::get()
431                .ok()
432                .and_then(|o| o.into_string().ok())
433                .unwrap_or_default(),
434            std::process::id()
435        );
436
437        let (otel_tracer, exporter) = {
438            let runtime = tokio::runtime::Builder::new_multi_thread()
439                .enable_all()
440                .thread_name("rw-otel")
441                .worker_threads(2)
442                .build()
443                .unwrap();
444            let runtime = Box::leak(Box::new(runtime));
445
446            // Installing the exporter requires a tokio runtime.
447            let _entered = runtime.enter();
448
449            // TODO(bugen): better service name
450            // https://github.com/jaegertracing/jaeger-ui/issues/336
451            let service_name = format!("{}-{}", settings.name, id);
452            let otel_tracer = TracerProviderBuilder::default()
453                .with_batch_exporter(
454                    SpanExporter::builder()
455                        .with_tonic()
456                        .with_endpoint(&endpoint)
457                        .build()
458                        .unwrap(),
459                )
460                .with_resource(
461                    Resource::builder()
462                        .with_attributes([
463                            KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
464                            KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
465                            KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
466                            KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
467                        ])
468                        .build(),
469                )
470                .build()
471                .tracer(service_name);
472
473            let exporter = SpanExporter::builder()
474                .with_tonic()
475                .with_endpoint(&endpoint)
476                .with_protocol(opentelemetry_otlp::Protocol::Grpc)
477                .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
478                .build()
479                .unwrap();
480
481            (otel_tracer, exporter)
482        };
483
484        // Disable by filtering out all events or spans by default.
485        //
486        // It'll be enabled with `toggle_otel_layer` based on the system parameter `enable_tracing` later.
487        let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());
488
489        set_toggle_otel_layer_fn(move |enabled: bool| {
490            let result = reload_handle.modify(|f| {
491                *f = if enabled {
492                    default_filter.clone()
493                } else {
494                    disabled_filter()
495                }
496            });
497
498            match result {
499                Ok(_) => tracing::info!(
500                    "opentelemetry tracing {}",
501                    if enabled { "enabled" } else { "disabled" },
502                ),
503
504                Err(error) => tracing::error!(
505                    error = %error.as_report(),
506                    "failed to {} opentelemetry tracing",
507                    if enabled { "enable" } else { "disable" },
508                ),
509            }
510        });
511
512        let layer = tracing_opentelemetry::layer()
513            .with_tracer(otel_tracer)
514            .with_filter(reload_filter);
515
516        layers.push(layer.boxed());
517
518        // The reporter is used by fastrace in foyer for dynamically tail-based tracing.
519        //
520        // Code here only setup the OpenTelemetry reporter. To enable/disable the function, please use risectl.
521        //
522        // e.g.
523        //
524        // ```bash
525        // risectl hummock tiered-cache-tracing -h
526        // ```
527        let reporter = OpenTelemetryReporter::new(
528            exporter,
529            SpanKind::Server,
530            Cow::Owned(
531                Resource::builder()
532                    .with_service_name(format!("fastrace-{id}"))
533                    .build(),
534            ),
535            InstrumentationScope::builder("opentelemetry-instrumentation-foyer").build(),
536        );
537        fastrace::set_reporter(reporter, fastrace::collector::Config::default());
538        tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
539    }
540
541    // Metrics layer
542    {
543        let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG);
544
545        layers.push(Box::new(MetricsLayer::new().with_filter(filter)));
546    }
547    tracing_subscriber::registry().with(layers).init();
548    // TODO: add file-appender tracing subscriber in the future
549}