risingwave_rt/
logger.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::env;
17use std::path::PathBuf;
18
19use either::Either;
20use fastrace_opentelemetry::OpenTelemetryReporter;
21use opentelemetry::InstrumentationScope;
22use opentelemetry::trace::{SpanKind, TracerProvider};
23use opentelemetry_otlp::SpanExporter;
24use opentelemetry_sdk::Resource;
25use opentelemetry_sdk::trace::TracerProviderBuilder;
26use risingwave_common::metrics::MetricsLayer;
27use risingwave_common::util::deployment::Deployment;
28use risingwave_common::util::env_var::env_var_is_true;
29use risingwave_common::util::query_log::*;
30use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
31use thiserror_ext::AsReport;
32use tracing::level_filters::LevelFilter as Level;
33use tracing_subscriber::filter::Targets;
34use tracing_subscriber::fmt::FormatFields;
35use tracing_subscriber::fmt::format::DefaultFields;
36use tracing_subscriber::fmt::time::OffsetTime;
37use tracing_subscriber::layer::SubscriberExt;
38use tracing_subscriber::prelude::*;
39use tracing_subscriber::{EnvFilter, filter, reload};
40
41pub struct LoggerSettings {
42    /// The name of the service. Used to identify the service in distributed tracing.
43    name: String,
44    /// Enable tokio console output.
45    enable_tokio_console: bool,
46    /// Enable colorful output in console.
47    colorful: bool,
48    /// Output to `stderr` instead of `stdout`.
49    stderr: bool,
50    /// Whether to include thread name in the log.
51    with_thread_name: bool,
52    /// Override target settings.
53    targets: Vec<(String, tracing::metadata::LevelFilter)>,
54    /// Override the default level.
55    default_level: Option<tracing::metadata::LevelFilter>,
56    /// The endpoint of the tracing collector in OTLP gRPC protocol.
57    tracing_endpoint: Option<String>,
58}
59
60impl Default for LoggerSettings {
61    fn default() -> Self {
62        Self::new("risingwave")
63    }
64}
65
66impl LoggerSettings {
67    /// Create a new logger settings from the given command-line options.
68    ///
69    /// If env var `RW_TRACING_ENDPOINT` is not set, the meta address will be used
70    /// as the default tracing endpoint, which means that the embedded tracing
71    /// collector will be used.
72    pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
73        let mut settings = Self::new(O::name());
74        if settings.tracing_endpoint.is_none() // no explicit endpoint
75            && let Some(addr) = opts.meta_addr().exactly_one()
76        // meta address is valid
77        {
78            // Use embedded collector in the meta service.
79            // TODO: when there's multiple meta nodes for high availability, we may send
80            // to a wrong node here.
81            settings.tracing_endpoint = Some(addr.to_string());
82        }
83        settings
84    }
85
86    /// Create a new logger settings with the given service name.
87    pub fn new(name: impl Into<String>) -> Self {
88        Self {
89            name: name.into(),
90            enable_tokio_console: false,
91            colorful: console::colors_enabled_stderr() && console::colors_enabled(),
92            stderr: false,
93            with_thread_name: false,
94            targets: vec![],
95            default_level: None,
96            tracing_endpoint: std::env::var("RW_TRACING_ENDPOINT").ok(),
97        }
98    }
99
100    /// Enable tokio console output.
101    pub fn tokio_console(mut self, enabled: bool) -> Self {
102        self.enable_tokio_console = enabled;
103        self
104    }
105
106    /// Output to `stderr` instead of `stdout`.
107    pub fn stderr(mut self, enabled: bool) -> Self {
108        self.stderr = enabled;
109        self
110    }
111
112    /// Whether to include thread name in the log.
113    pub fn with_thread_name(mut self, enabled: bool) -> Self {
114        self.with_thread_name = enabled;
115        self
116    }
117
118    /// Overrides the default target settings.
119    pub fn with_target(
120        mut self,
121        target: impl Into<String>,
122        level: impl Into<tracing::metadata::LevelFilter>,
123    ) -> Self {
124        self.targets.push((target.into(), level.into()));
125        self
126    }
127
128    /// Overrides the default level.
129    pub fn with_default(mut self, level: impl Into<tracing::metadata::LevelFilter>) -> Self {
130        self.default_level = Some(level.into());
131        self
132    }
133
134    /// Overrides the tracing endpoint.
135    pub fn with_tracing_endpoint(mut self, endpoint: impl Into<String>) -> Self {
136        self.tracing_endpoint = Some(endpoint.into());
137        self
138    }
139}
140
141/// Create a filter that disables all events or spans.
142fn disabled_filter() -> filter::Targets {
143    filter::Targets::new()
144}
145
146/// Init logger for RisingWave binaries.
147///
148/// ## Environment variables to configure logger dynamically
149///
150/// ### `RUST_LOG`
151///
152/// Overrides default level and tracing targets of the fmt layer (formatting and
153/// logging to `stdout` or `stderr`).
154///
155/// Note that only verbosity levels below or equal to `DEBUG` are effective in
156/// release builds.
157///
158/// e.g.,
159/// ```bash
160/// RUST_LOG="info,risingwave_stream=debug,events=debug"
161/// ```
162///
163/// ### `RW_QUERY_LOG_PATH`
164///
165/// Configures the path to generate query log.
166///
167/// If it is set,
168/// - Dump logs of all SQLs, i.e., tracing target [`PGWIRE_QUERY_LOG`] to
169///   `RW_QUERY_LOG_PATH/query.log`.
170/// - Dump slow queries, i.e., tracing target [`PGWIRE_SLOW_QUERY_LOG`] to
171///   `RW_QUERY_LOG_PATH/slow_query.log`.
172///
173/// Note:
174/// To enable query log in the fmt layer (slow query is included by default), set
175/// ```bash
176/// RUST_LOG="pgwire_query_log=info"
177/// ```
178///
179/// `RW_QUERY_LOG_TRUNCATE_LEN` configures the max length of the SQLs logged in the query log,
180/// to avoid the log file growing too large. The default value is 1024 in production.
181///
182/// ### `ENABLE_PRETTY_LOG`
183///
184/// If it is set to `true`, enable pretty log output, which contains line numbers and prints spans in multiple lines.
185/// This can be helpful for development and debugging.
186///
187/// Hint: Also turn off other uninteresting logs to make the most of the pretty log.
188/// e.g.,
189/// ```bash
190/// RUST_LOG="risingwave_storage::hummock::event_handler=off,batch_execute=off,risingwave_batch::task=off" ENABLE_PRETTY_LOG=true risedev d
191/// ```
192pub fn init_risingwave_logger(settings: LoggerSettings) {
193    let deployment = Deployment::current();
194
195    // Default timer for logging with local time offset.
196    let default_timer = OffsetTime::local_rfc_3339().unwrap_or_else(|e| {
197        println!(
198            "failed to get local time offset, falling back to UTC: {}",
199            e.as_report()
200        );
201        OffsetTime::new(
202            time::UtcOffset::UTC,
203            time::format_description::well_known::Rfc3339,
204        )
205    });
206
207    // Default filter for logging to stdout and tracing.
208    let default_filter = {
209        let mut filter = filter::Targets::new();
210
211        // Configure levels for some RisingWave crates.
212        // Other RisingWave crates like `stream` and `storage` will follow the default level.
213        filter = filter
214            .with_target("auto_schema_change", Level::INFO)
215            .with_target("risingwave_sqlparser", Level::INFO)
216            .with_target("risingwave_connector_node", Level::INFO)
217            .with_target("pgwire", Level::INFO)
218            .with_target(PGWIRE_QUERY_LOG, Level::OFF)
219            // debug-purposed events are disabled unless `RUST_LOG` overrides
220            .with_target("events", Level::OFF);
221
222        // Configure levels for external crates.
223        filter = filter
224            .with_target("foyer", Level::INFO)
225            .with_target("aws", Level::INFO)
226            .with_target("aws_config", Level::WARN)
227            .with_target("aws_endpoint", Level::WARN)
228            .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)
229            .with_target("hyper", Level::WARN)
230            .with_target("h2", Level::WARN)
231            .with_target("tower", Level::WARN)
232            .with_target("tonic", Level::WARN)
233            .with_target("isahc", Level::WARN)
234            .with_target("console_subscriber", Level::WARN)
235            .with_target("reqwest", Level::WARN)
236            .with_target("sled", Level::INFO)
237            .with_target("cranelift", Level::INFO)
238            .with_target("wasmtime", Level::INFO)
239            .with_target("sqlx", Level::WARN)
240            .with_target("opendal", Level::INFO)
241            .with_target("reqsign", Level::INFO);
242
243        // For all other crates, apply default level depending on the deployment and `debug_assertions` flag.
244        let default_level = match deployment {
245            Deployment::Ci => Level::INFO,
246            _ => {
247                if cfg!(debug_assertions) {
248                    Level::DEBUG
249                } else {
250                    Level::INFO
251                }
252            }
253        };
254        filter = filter.with_default(default_level);
255
256        // Overrides from settings.
257        filter = filter.with_targets(settings.targets);
258        if let Some(default_level) = settings.default_level {
259            filter = filter.with_default(default_level);
260        }
261
262        // Overrides from env var.
263        if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV)
264            && !rust_log.is_empty()
265        {
266            let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
267            if let Some(default_level) = rust_log_targets.default_level() {
268                filter = filter.with_default(default_level);
269            }
270            filter = filter.with_targets(rust_log_targets)
271        };
272
273        filter
274    };
275
276    let mut layers = vec![];
277
278    // fmt layer (formatting and logging to `stdout` or `stderr`)
279    {
280        let fmt_layer = tracing_subscriber::fmt::layer()
281            .with_thread_names(settings.with_thread_name)
282            .with_timer(default_timer.clone())
283            .with_ansi(settings.colorful)
284            .with_writer(move || {
285                if settings.stderr {
286                    Either::Left(std::io::stderr())
287                } else {
288                    Either::Right(std::io::stdout())
289                }
290            });
291
292        let fmt_layer = match deployment {
293            Deployment::Ci => fmt_layer.compact().boxed(),
294            Deployment::Cloud => fmt_layer
295                .json()
296                .map_event_format(|e| e.with_current_span(false)) // avoid duplication as there's a span list field
297                .boxed(),
298            Deployment::Other => {
299                if env_var_is_true("ENABLE_PRETTY_LOG") {
300                    fmt_layer.pretty().boxed()
301                } else {
302                    fmt_layer.boxed()
303                }
304            }
305        };
306
307        layers.push(
308            fmt_layer
309                .with_filter(default_filter.clone().with_target("rw_tracing", Level::OFF)) // filter-out tracing-only events
310                .boxed(),
311        );
312    };
313
314    // If `RW_QUERY_LOG_PATH` env var is set to a directory, turn on query log files.
315    let query_log_path = std::env::var("RW_QUERY_LOG_PATH");
316    if let Ok(query_log_path) = query_log_path {
317        let query_log_path = PathBuf::from(query_log_path);
318        std::fs::create_dir_all(query_log_path.clone()).unwrap_or_else(|e| {
319            panic!(
320                "failed to create directory '{}' for query log: {}",
321                query_log_path.display(),
322                e.as_report(),
323            )
324        });
325
326        /// Newtype wrapper for `DefaultFields`.
327        ///
328        /// `fmt::Layer` will share the same `FormattedFields` extension for spans across
329        /// different layers, as long as the type of `N: FormatFields` is the same. This
330        /// will cause several problems:
331        ///
332        /// - `with_ansi(false)` does not take effect and it will follow the settings of
333        ///   the primary fmt layer installed above.
334        /// - `Span::record` will update the same `FormattedFields` multiple times,
335        ///   leading to duplicated fields.
336        ///
337        /// As a workaround, we use a newtype wrapper here to get a different type id.
338        /// The const generic parameter `SLOW` is further used to distinguish between the
339        /// query log and the slow query log.
340        #[derive(Default)]
341        struct FmtFields<const SLOW: bool>(DefaultFields);
342
343        impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields<SLOW> {
344            fn format_fields<R: tracing_subscriber::field::RecordFields>(
345                &self,
346                writer: tracing_subscriber::fmt::format::Writer<'writer>,
347                fields: R,
348            ) -> std::fmt::Result {
349                self.0.format_fields(writer, fields)
350            }
351        }
352
353        for (file_name, target, is_slow) in [
354            ("query.log", PGWIRE_QUERY_LOG, false),
355            ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true),
356        ] {
357            let path = query_log_path.join(file_name);
358
359            let file = std::fs::OpenOptions::new()
360                .create(true)
361                .write(true)
362                .truncate(true)
363                .open(&path)
364                .unwrap_or_else(|e| {
365                    panic!("failed to create `{}`: {}", path.display(), e.as_report(),)
366                });
367
368            let layer = tracing_subscriber::fmt::layer()
369                .with_ansi(false)
370                .with_level(false)
371                .with_file(false)
372                .with_target(false)
373                .with_timer(default_timer.clone())
374                .with_thread_names(true)
375                .with_thread_ids(true)
376                .with_writer(file);
377
378            let layer = match is_slow {
379                true => layer.fmt_fields(FmtFields::<true>::default()).boxed(),
380                false => layer.fmt_fields(FmtFields::<false>::default()).boxed(),
381            };
382
383            let layer = layer.with_filter(
384                filter::Targets::new()
385                    // Root span must be enabled to provide common info like the SQL query.
386                    .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO)
387                    .with_target(target, Level::INFO),
388            );
389
390            layers.push(layer.boxed());
391        }
392    }
393
394    if settings.enable_tokio_console {
395        let (console_layer, server) = console_subscriber::ConsoleLayer::builder()
396            .with_default_env()
397            .build();
398        let console_layer = console_layer.with_filter(
399            filter::Targets::new()
400                .with_target("tokio", Level::TRACE)
401                .with_target("runtime", Level::TRACE),
402        );
403        layers.push(console_layer.boxed());
404        std::thread::spawn(|| {
405            tokio::runtime::Builder::new_current_thread()
406                .enable_all()
407                .build()
408                .unwrap()
409                .block_on(async move {
410                    println!("serving console subscriber");
411                    server.serve().await.unwrap();
412                });
413        });
414    };
415
416    // Tracing layer
417    #[cfg(not(madsim))]
418    if let Some(endpoint) = settings.tracing_endpoint {
419        println!("opentelemetry tracing will be exported to `{endpoint}` if enabled");
420
421        use opentelemetry::KeyValue;
422        use opentelemetry_otlp::WithExportConfig;
423        use opentelemetry_semantic_conventions::resource;
424
425        let id = format!(
426            "{}-{}",
427            hostname::get()
428                .ok()
429                .and_then(|o| o.into_string().ok())
430                .unwrap_or_default(),
431            std::process::id()
432        );
433
434        let (otel_tracer, exporter) = {
435            let runtime = tokio::runtime::Builder::new_multi_thread()
436                .enable_all()
437                .thread_name("rw-otel")
438                .worker_threads(2)
439                .build()
440                .unwrap();
441            let runtime = Box::leak(Box::new(runtime));
442
443            // Installing the exporter requires a tokio runtime.
444            let _entered = runtime.enter();
445
446            // TODO(bugen): better service name
447            // https://github.com/jaegertracing/jaeger-ui/issues/336
448            let service_name = format!("{}-{}", settings.name, id);
449            let otel_tracer = TracerProviderBuilder::default()
450                .with_batch_exporter(
451                    SpanExporter::builder()
452                        .with_tonic()
453                        .with_endpoint(&endpoint)
454                        .build()
455                        .unwrap(),
456                )
457                .with_resource(
458                    Resource::builder()
459                        .with_attributes([
460                            KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
461                            KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
462                            KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
463                            KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
464                        ])
465                        .build(),
466                )
467                .build()
468                .tracer(service_name);
469
470            let exporter = SpanExporter::builder()
471                .with_tonic()
472                .with_endpoint(&endpoint)
473                .with_protocol(opentelemetry_otlp::Protocol::Grpc)
474                .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
475                .build()
476                .unwrap();
477
478            (otel_tracer, exporter)
479        };
480
481        // Disable by filtering out all events or spans by default.
482        //
483        // It'll be enabled with `toggle_otel_layer` based on the system parameter `enable_tracing` later.
484        let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());
485
486        set_toggle_otel_layer_fn(move |enabled: bool| {
487            let result = reload_handle.modify(|f| {
488                *f = if enabled {
489                    default_filter.clone()
490                } else {
491                    disabled_filter()
492                }
493            });
494
495            match result {
496                Ok(_) => tracing::info!(
497                    "opentelemetry tracing {}",
498                    if enabled { "enabled" } else { "disabled" },
499                ),
500
501                Err(error) => tracing::error!(
502                    error = %error.as_report(),
503                    "failed to {} opentelemetry tracing",
504                    if enabled { "enable" } else { "disable" },
505                ),
506            }
507        });
508
509        let layer = tracing_opentelemetry::layer()
510            .with_tracer(otel_tracer)
511            .with_filter(reload_filter);
512
513        layers.push(layer.boxed());
514
515        // The reporter is used by fastrace in foyer for dynamically tail-based tracing.
516        //
517        // Code here only setup the OpenTelemetry reporter. To enable/disable the function, please use risectl.
518        //
519        // e.g.
520        //
521        // ```bash
522        // risectl hummock tiered-cache-tracing -h
523        // ```
524        let reporter = OpenTelemetryReporter::new(
525            exporter,
526            SpanKind::Server,
527            Cow::Owned(
528                Resource::builder()
529                    .with_service_name(format!("fastrace-{id}"))
530                    .build(),
531            ),
532            InstrumentationScope::builder("opentelemetry-instrumentation-foyer").build(),
533        );
534        fastrace::set_reporter(reporter, fastrace::collector::Config::default());
535        tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
536    }
537
538    // Metrics layer
539    {
540        let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG);
541
542        layers.push(Box::new(MetricsLayer::new().with_filter(filter)));
543    }
544    tracing_subscriber::registry().with(layers).init();
545    // TODO: add file-appender tracing subscriber in the future
546}