1use std::borrow::Cow;
16use std::env;
17use std::path::PathBuf;
18
19use either::Either;
20use fastrace_opentelemetry::OpenTelemetryReporter;
21use opentelemetry::InstrumentationScope;
22use opentelemetry::trace::TracerProvider;
23use opentelemetry_otlp::SpanExporter;
24use opentelemetry_sdk::Resource;
25use opentelemetry_sdk::trace::TracerProviderBuilder;
26use risingwave_common::metrics::MetricsLayer;
27use risingwave_common::util::deployment::Deployment;
28use risingwave_common::util::env_var::env_var_is_true;
29use risingwave_common::util::query_log::*;
30use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
31use thiserror_ext::AsReport;
32use tracing::level_filters::LevelFilter as Level;
33use tracing_subscriber::filter::Targets;
34use tracing_subscriber::fmt::FormatFields;
35use tracing_subscriber::fmt::format::DefaultFields;
36use tracing_subscriber::fmt::time::OffsetTime;
37use tracing_subscriber::layer::SubscriberExt;
38use tracing_subscriber::prelude::*;
39use tracing_subscriber::{EnvFilter, filter, reload};
40
41fn parse_extra_tracing_attributes(input: &str) -> Vec<opentelemetry::KeyValue> {
49 use opentelemetry::KeyValue;
50
51 if input.trim().is_empty() {
52 return vec![];
53 }
54
55 input
56 .split(',')
57 .filter_map(|pair| {
58 let pair = pair.trim();
59 if pair.is_empty() {
60 return None;
61 }
62 let parts: Vec<&str> = pair.splitn(2, '=').collect();
63 if parts.len() != 2 {
64 eprintln!("warning: ignoring invalid tracing attribute pair (no '='): {pair:?}");
65 return None;
66 }
67 let key = parts[0].trim();
68 let value = parts[1].trim();
69 if key.is_empty() {
70 eprintln!("warning: ignoring tracing attribute with empty key: {pair:?}");
71 return None;
72 }
73 Some(KeyValue::new(key.to_owned(), value.to_owned()))
74 })
75 .collect()
76}
77
78pub struct LoggerSettings {
79 name: String,
81 enable_tokio_console: bool,
83 colorful: bool,
85 stderr: bool,
87 with_thread_name: bool,
89 targets: Vec<(String, tracing::metadata::LevelFilter)>,
91 default_level: Option<tracing::metadata::LevelFilter>,
93 tracing_endpoint: Option<String>,
95 extra_tracing_attributes: Vec<opentelemetry::KeyValue>,
97}
98
99impl Default for LoggerSettings {
100 fn default() -> Self {
101 Self::new("risingwave")
102 }
103}
104
105impl LoggerSettings {
106 pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
112 let mut settings = Self::new(O::name());
113 if settings.tracing_endpoint.is_none() && let Some(addr) = opts.meta_addr().exactly_one()
115 {
117 settings.tracing_endpoint = Some(addr.to_string());
121 }
122 settings
123 }
124
125 pub fn new(name: impl Into<String>) -> Self {
127 Self {
128 name: name.into(),
129 enable_tokio_console: false,
130 colorful: console::colors_enabled_stderr() && console::colors_enabled(),
131 stderr: false,
132 with_thread_name: false,
133 targets: vec![],
134 default_level: None,
135 tracing_endpoint: std::env::var("RW_TRACING_ENDPOINT").ok(),
136 extra_tracing_attributes: std::env::var("RW_TRACING_EXTRA_ATTRIBUTES")
137 .ok()
138 .map(|v| parse_extra_tracing_attributes(&v))
139 .unwrap_or_default(),
140 }
141 }
142
143 pub fn tokio_console(mut self, enabled: bool) -> Self {
145 self.enable_tokio_console = enabled;
146 self
147 }
148
149 pub fn stderr(mut self, enabled: bool) -> Self {
151 self.stderr = enabled;
152 self
153 }
154
155 pub fn with_thread_name(mut self, enabled: bool) -> Self {
157 self.with_thread_name = enabled;
158 self
159 }
160
161 pub fn with_target(
163 mut self,
164 target: impl Into<String>,
165 level: impl Into<tracing::metadata::LevelFilter>,
166 ) -> Self {
167 self.targets.push((target.into(), level.into()));
168 self
169 }
170
171 pub fn with_default(mut self, level: impl Into<tracing::metadata::LevelFilter>) -> Self {
173 self.default_level = Some(level.into());
174 self
175 }
176
177 pub fn with_tracing_endpoint(mut self, endpoint: impl Into<String>) -> Self {
179 self.tracing_endpoint = Some(endpoint.into());
180 self
181 }
182}
183
184fn disabled_filter() -> filter::Targets {
186 filter::Targets::new()
187}
188
189pub fn init_risingwave_logger(settings: LoggerSettings) {
236 let deployment = Deployment::current();
237
238 let default_timer = OffsetTime::local_rfc_3339().unwrap_or_else(|e| {
240 println!(
241 "failed to get local time offset, falling back to UTC: {}",
242 e.as_report()
243 );
244 OffsetTime::new(
245 time::UtcOffset::UTC,
246 time::format_description::well_known::Rfc3339,
247 )
248 });
249
250 let default_filter = {
252 let mut filter = filter::Targets::new();
253
254 filter = filter
257 .with_target("auto_schema_change", Level::INFO)
258 .with_target("risingwave_sqlparser", Level::INFO)
259 .with_target("risingwave_connector_node", Level::INFO)
260 .with_target("pgwire", Level::INFO)
261 .with_target(PGWIRE_QUERY_LOG, Level::OFF)
262 .with_target("events", Level::OFF);
264
265 filter = filter
268 .with_target("foyer", Level::INFO)
269 .with_target("aws", Level::INFO)
270 .with_target("aws_config", Level::WARN)
271 .with_target("aws_endpoint", Level::WARN)
272 .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)
273 .with_target("hyper", Level::WARN)
274 .with_target("h2", Level::WARN)
275 .with_target("tower", Level::WARN)
276 .with_target("tonic", Level::WARN)
277 .with_target("isahc", Level::WARN)
278 .with_target("console_subscriber", Level::WARN)
279 .with_target("reqwest", Level::WARN)
280 .with_target("sled", Level::INFO)
281 .with_target("cranelift", Level::INFO)
282 .with_target("wasmtime", Level::INFO)
283 .with_target("sqlx", Level::WARN)
284 .with_target("opendal", Level::INFO)
285 .with_target("reqsign", Level::INFO)
286 .with_target("jni", Level::INFO)
287 .with_target("async_nats", Level::WARN);
288
289 let default_level = match deployment {
291 Deployment::Ci => Level::INFO,
292 _ => {
293 if cfg!(debug_assertions) {
294 Level::DEBUG
295 } else {
296 Level::INFO
297 }
298 }
299 };
300 filter = filter.with_default(default_level);
301
302 filter = filter.with_targets(settings.targets);
304 if let Some(default_level) = settings.default_level {
305 filter = filter.with_default(default_level);
306 }
307
308 if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV)
310 && !rust_log.is_empty()
311 {
312 let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
313 if let Some(default_level) = rust_log_targets.default_level() {
314 filter = filter.with_default(default_level);
315 }
316 filter = filter.with_targets(rust_log_targets)
317 };
318
319 filter
320 };
321
322 let mut layers = vec![];
323
324 {
326 let fmt_layer = tracing_subscriber::fmt::layer()
327 .with_thread_names(settings.with_thread_name)
328 .with_timer(default_timer.clone())
329 .with_ansi(settings.colorful)
330 .with_writer(move || {
331 if settings.stderr {
332 Either::Left(std::io::stderr())
333 } else {
334 Either::Right(std::io::stdout())
335 }
336 });
337
338 let fmt_layer = match deployment {
339 Deployment::Ci => fmt_layer.compact().boxed(),
340 Deployment::Cloud => fmt_layer
341 .json()
342 .map_event_format(|e| e.with_current_span(false)) .boxed(),
344 Deployment::Other => {
345 if env_var_is_true("ENABLE_PRETTY_LOG") {
346 fmt_layer.pretty().boxed()
347 } else {
348 fmt_layer.boxed()
349 }
350 }
351 };
352
353 layers.push(
354 fmt_layer
355 .with_filter(default_filter.clone().with_target("rw_tracing", Level::OFF)) .boxed(),
357 );
358 };
359
360 let query_log_path = std::env::var("RW_QUERY_LOG_PATH");
362 if let Ok(query_log_path) = query_log_path {
363 let query_log_path = PathBuf::from(query_log_path);
364 std::fs::create_dir_all(query_log_path.clone()).unwrap_or_else(|e| {
365 panic!(
366 "failed to create directory '{}' for query log: {}",
367 query_log_path.display(),
368 e.as_report(),
369 )
370 });
371
372 #[derive(Default)]
387 struct FmtFields<const SLOW: bool>(DefaultFields);
388
389 impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields<SLOW> {
390 fn format_fields<R: tracing_subscriber::field::RecordFields>(
391 &self,
392 writer: tracing_subscriber::fmt::format::Writer<'writer>,
393 fields: R,
394 ) -> std::fmt::Result {
395 self.0.format_fields(writer, fields)
396 }
397 }
398
399 for (file_name, target, is_slow) in [
400 ("query.log", PGWIRE_QUERY_LOG, false),
401 ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true),
402 ] {
403 let path = query_log_path.join(file_name);
404
405 let file = std::fs::OpenOptions::new()
406 .create(true)
407 .write(true)
408 .truncate(true)
409 .open(&path)
410 .unwrap_or_else(|e| {
411 panic!("failed to create `{}`: {}", path.display(), e.as_report(),)
412 });
413
414 let layer = tracing_subscriber::fmt::layer()
415 .with_ansi(false)
416 .with_level(false)
417 .with_file(false)
418 .with_target(false)
419 .with_timer(default_timer.clone())
420 .with_thread_names(true)
421 .with_thread_ids(true)
422 .with_writer(file);
423
424 let layer = match is_slow {
425 true => layer.fmt_fields(FmtFields::<true>::default()).boxed(),
426 false => layer.fmt_fields(FmtFields::<false>::default()).boxed(),
427 };
428
429 let layer = layer.with_filter(
430 filter::Targets::new()
431 .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO)
433 .with_target(target, Level::INFO),
434 );
435
436 layers.push(layer.boxed());
437 }
438 }
439
440 if settings.enable_tokio_console {
441 let (console_layer, server) = console_subscriber::ConsoleLayer::builder()
442 .with_default_env()
443 .build();
444 let console_layer = console_layer.with_filter(
445 filter::Targets::new()
446 .with_target("tokio", Level::TRACE)
447 .with_target("runtime", Level::TRACE),
448 );
449 layers.push(console_layer.boxed());
450 std::thread::spawn(|| {
451 tokio::runtime::Builder::new_current_thread()
452 .enable_all()
453 .build()
454 .unwrap()
455 .block_on(async move {
456 println!("serving console subscriber");
457 server.serve().await.unwrap();
458 });
459 });
460 };
461
462 #[cfg(not(madsim))]
464 if let Some(endpoint) = settings.tracing_endpoint {
465 println!("opentelemetry tracing will be exported to `{endpoint}` if enabled");
466
467 use opentelemetry::KeyValue;
468 use opentelemetry_otlp::WithExportConfig;
469 use opentelemetry_semantic_conventions::resource;
470
471 let id = format!(
472 "{}-{}",
473 hostname::get()
474 .ok()
475 .and_then(|o| o.into_string().ok())
476 .unwrap_or_default(),
477 std::process::id()
478 );
479
480 let extra_attributes = &settings.extra_tracing_attributes;
481 if !extra_attributes.is_empty() {
482 println!(
483 "extra tracing resource attributes: {:?}",
484 extra_attributes
485 .iter()
486 .map(|kv| format!("{}={}", kv.key, kv.value))
487 .collect::<Vec<_>>()
488 );
489 }
490
491 let (otel_tracer, exporter) = {
492 let runtime = tokio::runtime::Builder::new_multi_thread()
493 .enable_all()
494 .thread_name("rw-otel")
495 .worker_threads(2)
496 .build()
497 .unwrap();
498 let runtime = Box::leak(Box::new(runtime));
499
500 let _entered = runtime.enter();
502
503 let service_name = format!("{}-{}", settings.name, id);
506
507 let mut resource_attrs = vec![
508 KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
509 KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
510 KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
511 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
512 ];
513 resource_attrs.extend(extra_attributes.iter().cloned());
514
515 let otel_tracer = TracerProviderBuilder::default()
516 .with_batch_exporter(
517 SpanExporter::builder()
518 .with_tonic()
519 .with_endpoint(&endpoint)
520 .build()
521 .unwrap(),
522 )
523 .with_resource(Resource::builder().with_attributes(resource_attrs).build())
524 .build()
525 .tracer(service_name);
526
527 let exporter = SpanExporter::builder()
528 .with_tonic()
529 .with_endpoint(&endpoint)
530 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
531 .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
532 .build()
533 .unwrap();
534
535 (otel_tracer, exporter)
536 };
537
538 let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());
542
543 set_toggle_otel_layer_fn(move |enabled: bool| {
544 let result = reload_handle.modify(|f| {
545 *f = if enabled {
546 default_filter.clone()
547 } else {
548 disabled_filter()
549 }
550 });
551
552 match result {
553 Ok(_) => tracing::info!(
554 "opentelemetry tracing {}",
555 if enabled { "enabled" } else { "disabled" },
556 ),
557
558 Err(error) => tracing::error!(
559 error = %error.as_report(),
560 "failed to {} opentelemetry tracing",
561 if enabled { "enable" } else { "disable" },
562 ),
563 }
564 });
565
566 let layer = tracing_opentelemetry::layer()
567 .with_tracer(otel_tracer)
568 .with_filter(reload_filter);
569
570 layers.push(layer.boxed());
571
572 let mut fastrace_resource_attrs: Vec<opentelemetry::KeyValue> =
582 vec![opentelemetry::KeyValue::new(
583 opentelemetry_semantic_conventions::resource::SERVICE_NAME,
584 format!("fastrace-{id}"),
585 )];
586 fastrace_resource_attrs.extend(extra_attributes.iter().cloned());
587
588 let reporter = OpenTelemetryReporter::new(
589 exporter,
590 Cow::Owned(
591 Resource::builder()
592 .with_attributes(fastrace_resource_attrs)
593 .build(),
594 ),
595 InstrumentationScope::builder("opentelemetry-instrumentation-foyer").build(),
596 );
597 fastrace::set_reporter(reporter, fastrace::collector::Config::default());
598 tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
599 }
600
601 {
603 let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG);
604
605 layers.push(Box::new(MetricsLayer::new().with_filter(filter)));
606 }
607 tracing_subscriber::registry().with(layers).init();
608 }
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614
615 #[test]
616 fn test_parse_extra_tracing_attributes_normal() {
617 let attrs =
618 parse_extra_tracing_attributes("cluster=prod,region=us-east-1,namespace=terry-dev");
619 assert_eq!(attrs.len(), 3);
620 assert_eq!(attrs[0].key.as_str(), "cluster");
621 assert_eq!(attrs[0].value.as_str(), "prod");
622 assert_eq!(attrs[1].key.as_str(), "region");
623 assert_eq!(attrs[1].value.as_str(), "us-east-1");
624 assert_eq!(attrs[2].key.as_str(), "namespace");
625 assert_eq!(attrs[2].value.as_str(), "terry-dev");
626 }
627
628 #[test]
629 fn test_parse_extra_tracing_attributes_whitespace() {
630 let attrs = parse_extra_tracing_attributes(" key1 = val1 , key2=val2 ");
631 assert_eq!(attrs.len(), 2);
632 assert_eq!(attrs[0].key.as_str(), "key1");
633 assert_eq!(attrs[0].value.as_str(), "val1");
634 assert_eq!(attrs[1].key.as_str(), "key2");
635 assert_eq!(attrs[1].value.as_str(), "val2");
636 }
637
638 #[test]
639 fn test_parse_extra_tracing_attributes_empty() {
640 assert!(parse_extra_tracing_attributes("").is_empty());
641 assert!(parse_extra_tracing_attributes(" ").is_empty());
642 }
643
644 #[test]
645 fn test_parse_extra_tracing_attributes_invalid_pairs() {
646 let attrs = parse_extra_tracing_attributes("good=value,badpair,also_good=123");
648 assert_eq!(attrs.len(), 2);
649 assert_eq!(attrs[0].key.as_str(), "good");
650 assert_eq!(attrs[1].key.as_str(), "also_good");
651 }
652
653 #[test]
654 fn test_parse_extra_tracing_attributes_empty_key() {
655 let attrs = parse_extra_tracing_attributes("=value,key=val");
657 assert_eq!(attrs.len(), 1);
658 assert_eq!(attrs[0].key.as_str(), "key");
659 }
660
661 #[test]
662 fn test_parse_extra_tracing_attributes_value_with_equals() {
663 let attrs = parse_extra_tracing_attributes("expr=a=b");
665 assert_eq!(attrs.len(), 1);
666 assert_eq!(attrs[0].key.as_str(), "expr");
667 assert_eq!(attrs[0].value.as_str(), "a=b");
668 }
669
670 #[test]
671 fn test_parse_extra_tracing_attributes_trailing_comma() {
672 let attrs = parse_extra_tracing_attributes("k1=v1,k2=v2,");
673 assert_eq!(attrs.len(), 2);
674 }
675
676 #[test]
677 fn test_parse_extra_tracing_attributes_empty_value() {
678 let attrs = parse_extra_tracing_attributes("key=");
679 assert_eq!(attrs.len(), 1);
680 assert_eq!(attrs[0].key.as_str(), "key");
681 assert_eq!(attrs[0].value.as_str(), "");
682 }
683}