1use std::borrow::Cow;
16use std::env;
17use std::path::PathBuf;
18
19use either::Either;
20use fastrace_opentelemetry::OpenTelemetryReporter;
21use opentelemetry::InstrumentationScope;
22use opentelemetry::trace::{SpanKind, TracerProvider};
23use opentelemetry_otlp::SpanExporter;
24use opentelemetry_sdk::Resource;
25use opentelemetry_sdk::trace::TracerProviderBuilder;
26use risingwave_common::metrics::MetricsLayer;
27use risingwave_common::util::deployment::Deployment;
28use risingwave_common::util::env_var::env_var_is_true;
29use risingwave_common::util::query_log::*;
30use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
31use thiserror_ext::AsReport;
32use tracing::level_filters::LevelFilter as Level;
33use tracing_subscriber::filter::Targets;
34use tracing_subscriber::fmt::FormatFields;
35use tracing_subscriber::fmt::format::DefaultFields;
36use tracing_subscriber::fmt::time::OffsetTime;
37use tracing_subscriber::layer::SubscriberExt;
38use tracing_subscriber::prelude::*;
39use tracing_subscriber::{EnvFilter, filter, reload};
40
41pub struct LoggerSettings {
42 name: String,
44 enable_tokio_console: bool,
46 colorful: bool,
48 stderr: bool,
50 with_thread_name: bool,
52 targets: Vec<(String, tracing::metadata::LevelFilter)>,
54 default_level: Option<tracing::metadata::LevelFilter>,
56 tracing_endpoint: Option<String>,
58}
59
60impl Default for LoggerSettings {
61 fn default() -> Self {
62 Self::new("risingwave")
63 }
64}
65
66impl LoggerSettings {
67 pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
73 let mut settings = Self::new(O::name());
74 if settings.tracing_endpoint.is_none() && let Some(addr) = opts.meta_addr().exactly_one()
76 {
78 settings.tracing_endpoint = Some(addr.to_string());
82 }
83 settings
84 }
85
86 pub fn new(name: impl Into<String>) -> Self {
88 Self {
89 name: name.into(),
90 enable_tokio_console: false,
91 colorful: console::colors_enabled_stderr() && console::colors_enabled(),
92 stderr: false,
93 with_thread_name: false,
94 targets: vec![],
95 default_level: None,
96 tracing_endpoint: std::env::var("RW_TRACING_ENDPOINT").ok(),
97 }
98 }
99
100 pub fn tokio_console(mut self, enabled: bool) -> Self {
102 self.enable_tokio_console = enabled;
103 self
104 }
105
106 pub fn stderr(mut self, enabled: bool) -> Self {
108 self.stderr = enabled;
109 self
110 }
111
112 pub fn with_thread_name(mut self, enabled: bool) -> Self {
114 self.with_thread_name = enabled;
115 self
116 }
117
118 pub fn with_target(
120 mut self,
121 target: impl Into<String>,
122 level: impl Into<tracing::metadata::LevelFilter>,
123 ) -> Self {
124 self.targets.push((target.into(), level.into()));
125 self
126 }
127
128 pub fn with_default(mut self, level: impl Into<tracing::metadata::LevelFilter>) -> Self {
130 self.default_level = Some(level.into());
131 self
132 }
133
134 pub fn with_tracing_endpoint(mut self, endpoint: impl Into<String>) -> Self {
136 self.tracing_endpoint = Some(endpoint.into());
137 self
138 }
139}
140
141fn disabled_filter() -> filter::Targets {
143 filter::Targets::new()
144}
145
146pub fn init_risingwave_logger(settings: LoggerSettings) {
193 let deployment = Deployment::current();
194
195 let default_timer = OffsetTime::local_rfc_3339().unwrap_or_else(|e| {
197 println!(
198 "failed to get local time offset, falling back to UTC: {}",
199 e.as_report()
200 );
201 OffsetTime::new(
202 time::UtcOffset::UTC,
203 time::format_description::well_known::Rfc3339,
204 )
205 });
206
207 let default_filter = {
209 let mut filter = filter::Targets::new();
210
211 filter = filter
214 .with_target("auto_schema_change", Level::INFO)
215 .with_target("risingwave_sqlparser", Level::INFO)
216 .with_target("risingwave_connector_node", Level::INFO)
217 .with_target("pgwire", Level::INFO)
218 .with_target(PGWIRE_QUERY_LOG, Level::OFF)
219 .with_target("events", Level::OFF);
221
222 filter = filter
225 .with_target("foyer", Level::INFO)
226 .with_target("aws", Level::INFO)
227 .with_target("aws_config", Level::WARN)
228 .with_target("aws_endpoint", Level::WARN)
229 .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)
230 .with_target("hyper", Level::WARN)
231 .with_target("h2", Level::WARN)
232 .with_target("tower", Level::WARN)
233 .with_target("tonic", Level::WARN)
234 .with_target("isahc", Level::WARN)
235 .with_target("console_subscriber", Level::WARN)
236 .with_target("reqwest", Level::WARN)
237 .with_target("sled", Level::INFO)
238 .with_target("cranelift", Level::INFO)
239 .with_target("wasmtime", Level::INFO)
240 .with_target("sqlx", Level::WARN)
241 .with_target("opendal", Level::INFO)
242 .with_target("reqsign", Level::INFO)
243 .with_target("jni", Level::INFO)
244 .with_target("async_nats", Level::WARN);
245
246 let default_level = match deployment {
248 Deployment::Ci => Level::INFO,
249 _ => {
250 if cfg!(debug_assertions) {
251 Level::DEBUG
252 } else {
253 Level::INFO
254 }
255 }
256 };
257 filter = filter.with_default(default_level);
258
259 filter = filter.with_targets(settings.targets);
261 if let Some(default_level) = settings.default_level {
262 filter = filter.with_default(default_level);
263 }
264
265 if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV)
267 && !rust_log.is_empty()
268 {
269 let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
270 if let Some(default_level) = rust_log_targets.default_level() {
271 filter = filter.with_default(default_level);
272 }
273 filter = filter.with_targets(rust_log_targets)
274 };
275
276 filter
277 };
278
279 let mut layers = vec![];
280
281 {
283 let fmt_layer = tracing_subscriber::fmt::layer()
284 .with_thread_names(settings.with_thread_name)
285 .with_timer(default_timer.clone())
286 .with_ansi(settings.colorful)
287 .with_writer(move || {
288 if settings.stderr {
289 Either::Left(std::io::stderr())
290 } else {
291 Either::Right(std::io::stdout())
292 }
293 });
294
295 let fmt_layer = match deployment {
296 Deployment::Ci => fmt_layer.compact().boxed(),
297 Deployment::Cloud => fmt_layer
298 .json()
299 .map_event_format(|e| e.with_current_span(false)) .boxed(),
301 Deployment::Other => {
302 if env_var_is_true("ENABLE_PRETTY_LOG") {
303 fmt_layer.pretty().boxed()
304 } else {
305 fmt_layer.boxed()
306 }
307 }
308 };
309
310 layers.push(
311 fmt_layer
312 .with_filter(default_filter.clone().with_target("rw_tracing", Level::OFF)) .boxed(),
314 );
315 };
316
317 let query_log_path = std::env::var("RW_QUERY_LOG_PATH");
319 if let Ok(query_log_path) = query_log_path {
320 let query_log_path = PathBuf::from(query_log_path);
321 std::fs::create_dir_all(query_log_path.clone()).unwrap_or_else(|e| {
322 panic!(
323 "failed to create directory '{}' for query log: {}",
324 query_log_path.display(),
325 e.as_report(),
326 )
327 });
328
329 #[derive(Default)]
344 struct FmtFields<const SLOW: bool>(DefaultFields);
345
346 impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields<SLOW> {
347 fn format_fields<R: tracing_subscriber::field::RecordFields>(
348 &self,
349 writer: tracing_subscriber::fmt::format::Writer<'writer>,
350 fields: R,
351 ) -> std::fmt::Result {
352 self.0.format_fields(writer, fields)
353 }
354 }
355
356 for (file_name, target, is_slow) in [
357 ("query.log", PGWIRE_QUERY_LOG, false),
358 ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true),
359 ] {
360 let path = query_log_path.join(file_name);
361
362 let file = std::fs::OpenOptions::new()
363 .create(true)
364 .write(true)
365 .truncate(true)
366 .open(&path)
367 .unwrap_or_else(|e| {
368 panic!("failed to create `{}`: {}", path.display(), e.as_report(),)
369 });
370
371 let layer = tracing_subscriber::fmt::layer()
372 .with_ansi(false)
373 .with_level(false)
374 .with_file(false)
375 .with_target(false)
376 .with_timer(default_timer.clone())
377 .with_thread_names(true)
378 .with_thread_ids(true)
379 .with_writer(file);
380
381 let layer = match is_slow {
382 true => layer.fmt_fields(FmtFields::<true>::default()).boxed(),
383 false => layer.fmt_fields(FmtFields::<false>::default()).boxed(),
384 };
385
386 let layer = layer.with_filter(
387 filter::Targets::new()
388 .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO)
390 .with_target(target, Level::INFO),
391 );
392
393 layers.push(layer.boxed());
394 }
395 }
396
397 if settings.enable_tokio_console {
398 let (console_layer, server) = console_subscriber::ConsoleLayer::builder()
399 .with_default_env()
400 .build();
401 let console_layer = console_layer.with_filter(
402 filter::Targets::new()
403 .with_target("tokio", Level::TRACE)
404 .with_target("runtime", Level::TRACE),
405 );
406 layers.push(console_layer.boxed());
407 std::thread::spawn(|| {
408 tokio::runtime::Builder::new_current_thread()
409 .enable_all()
410 .build()
411 .unwrap()
412 .block_on(async move {
413 println!("serving console subscriber");
414 server.serve().await.unwrap();
415 });
416 });
417 };
418
419 #[cfg(not(madsim))]
421 if let Some(endpoint) = settings.tracing_endpoint {
422 println!("opentelemetry tracing will be exported to `{endpoint}` if enabled");
423
424 use opentelemetry::KeyValue;
425 use opentelemetry_otlp::WithExportConfig;
426 use opentelemetry_semantic_conventions::resource;
427
428 let id = format!(
429 "{}-{}",
430 hostname::get()
431 .ok()
432 .and_then(|o| o.into_string().ok())
433 .unwrap_or_default(),
434 std::process::id()
435 );
436
437 let (otel_tracer, exporter) = {
438 let runtime = tokio::runtime::Builder::new_multi_thread()
439 .enable_all()
440 .thread_name("rw-otel")
441 .worker_threads(2)
442 .build()
443 .unwrap();
444 let runtime = Box::leak(Box::new(runtime));
445
446 let _entered = runtime.enter();
448
449 let service_name = format!("{}-{}", settings.name, id);
452 let otel_tracer = TracerProviderBuilder::default()
453 .with_batch_exporter(
454 SpanExporter::builder()
455 .with_tonic()
456 .with_endpoint(&endpoint)
457 .build()
458 .unwrap(),
459 )
460 .with_resource(
461 Resource::builder()
462 .with_attributes([
463 KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
464 KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
465 KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
466 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
467 ])
468 .build(),
469 )
470 .build()
471 .tracer(service_name);
472
473 let exporter = SpanExporter::builder()
474 .with_tonic()
475 .with_endpoint(&endpoint)
476 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
477 .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
478 .build()
479 .unwrap();
480
481 (otel_tracer, exporter)
482 };
483
484 let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());
488
489 set_toggle_otel_layer_fn(move |enabled: bool| {
490 let result = reload_handle.modify(|f| {
491 *f = if enabled {
492 default_filter.clone()
493 } else {
494 disabled_filter()
495 }
496 });
497
498 match result {
499 Ok(_) => tracing::info!(
500 "opentelemetry tracing {}",
501 if enabled { "enabled" } else { "disabled" },
502 ),
503
504 Err(error) => tracing::error!(
505 error = %error.as_report(),
506 "failed to {} opentelemetry tracing",
507 if enabled { "enable" } else { "disable" },
508 ),
509 }
510 });
511
512 let layer = tracing_opentelemetry::layer()
513 .with_tracer(otel_tracer)
514 .with_filter(reload_filter);
515
516 layers.push(layer.boxed());
517
518 let reporter = OpenTelemetryReporter::new(
528 exporter,
529 SpanKind::Server,
530 Cow::Owned(
531 Resource::builder()
532 .with_service_name(format!("fastrace-{id}"))
533 .build(),
534 ),
535 InstrumentationScope::builder("opentelemetry-instrumentation-foyer").build(),
536 );
537 fastrace::set_reporter(reporter, fastrace::collector::Config::default());
538 tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
539 }
540
541 {
543 let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG);
544
545 layers.push(Box::new(MetricsLayer::new().with_filter(filter)));
546 }
547 tracing_subscriber::registry().with(layers).init();
548 }