1use std::borrow::Cow;
16use std::env;
17use std::path::PathBuf;
18
19use either::Either;
20use fastrace_opentelemetry::OpenTelemetryReporter;
21use opentelemetry::InstrumentationScope;
22use opentelemetry::trace::{SpanKind, TracerProvider};
23use opentelemetry_otlp::SpanExporter;
24use opentelemetry_sdk::Resource;
25use opentelemetry_sdk::trace::TracerProviderBuilder;
26use risingwave_common::metrics::MetricsLayer;
27use risingwave_common::util::deployment::Deployment;
28use risingwave_common::util::env_var::env_var_is_true;
29use risingwave_common::util::query_log::*;
30use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
31use thiserror_ext::AsReport;
32use tracing::level_filters::LevelFilter as Level;
33use tracing_subscriber::filter::Targets;
34use tracing_subscriber::fmt::FormatFields;
35use tracing_subscriber::fmt::format::DefaultFields;
36use tracing_subscriber::fmt::time::OffsetTime;
37use tracing_subscriber::layer::SubscriberExt;
38use tracing_subscriber::prelude::*;
39use tracing_subscriber::{EnvFilter, filter, reload};
40
41pub struct LoggerSettings {
42 name: String,
44 enable_tokio_console: bool,
46 colorful: bool,
48 stderr: bool,
50 with_thread_name: bool,
52 targets: Vec<(String, tracing::metadata::LevelFilter)>,
54 default_level: Option<tracing::metadata::LevelFilter>,
56 tracing_endpoint: Option<String>,
58}
59
60impl Default for LoggerSettings {
61 fn default() -> Self {
62 Self::new("risingwave")
63 }
64}
65
66impl LoggerSettings {
67 pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
73 let mut settings = Self::new(O::name());
74 if settings.tracing_endpoint.is_none() && let Some(addr) = opts.meta_addr().exactly_one()
76 {
78 settings.tracing_endpoint = Some(addr.to_string());
82 }
83 settings
84 }
85
86 pub fn new(name: impl Into<String>) -> Self {
88 Self {
89 name: name.into(),
90 enable_tokio_console: false,
91 colorful: console::colors_enabled_stderr() && console::colors_enabled(),
92 stderr: false,
93 with_thread_name: false,
94 targets: vec![],
95 default_level: None,
96 tracing_endpoint: std::env::var("RW_TRACING_ENDPOINT").ok(),
97 }
98 }
99
100 pub fn tokio_console(mut self, enabled: bool) -> Self {
102 self.enable_tokio_console = enabled;
103 self
104 }
105
106 pub fn stderr(mut self, enabled: bool) -> Self {
108 self.stderr = enabled;
109 self
110 }
111
112 pub fn with_thread_name(mut self, enabled: bool) -> Self {
114 self.with_thread_name = enabled;
115 self
116 }
117
118 pub fn with_target(
120 mut self,
121 target: impl Into<String>,
122 level: impl Into<tracing::metadata::LevelFilter>,
123 ) -> Self {
124 self.targets.push((target.into(), level.into()));
125 self
126 }
127
128 pub fn with_default(mut self, level: impl Into<tracing::metadata::LevelFilter>) -> Self {
130 self.default_level = Some(level.into());
131 self
132 }
133
134 pub fn with_tracing_endpoint(mut self, endpoint: impl Into<String>) -> Self {
136 self.tracing_endpoint = Some(endpoint.into());
137 self
138 }
139}
140
141fn disabled_filter() -> filter::Targets {
143 filter::Targets::new()
144}
145
146pub fn init_risingwave_logger(settings: LoggerSettings) {
193 let deployment = Deployment::current();
194
195 let default_timer = OffsetTime::local_rfc_3339().unwrap_or_else(|e| {
197 println!(
198 "failed to get local time offset, falling back to UTC: {}",
199 e.as_report()
200 );
201 OffsetTime::new(
202 time::UtcOffset::UTC,
203 time::format_description::well_known::Rfc3339,
204 )
205 });
206
207 let default_filter = {
209 let mut filter = filter::Targets::new();
210
211 filter = filter
214 .with_target("auto_schema_change", Level::INFO)
215 .with_target("risingwave_sqlparser", Level::INFO)
216 .with_target("risingwave_connector_node", Level::INFO)
217 .with_target("pgwire", Level::INFO)
218 .with_target(PGWIRE_QUERY_LOG, Level::OFF)
219 .with_target("events", Level::OFF);
221
222 filter = filter
224 .with_target("foyer", Level::INFO)
225 .with_target("aws", Level::INFO)
226 .with_target("aws_config", Level::WARN)
227 .with_target("aws_endpoint", Level::WARN)
228 .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)
229 .with_target("hyper", Level::WARN)
230 .with_target("h2", Level::WARN)
231 .with_target("tower", Level::WARN)
232 .with_target("tonic", Level::WARN)
233 .with_target("isahc", Level::WARN)
234 .with_target("console_subscriber", Level::WARN)
235 .with_target("reqwest", Level::WARN)
236 .with_target("sled", Level::INFO)
237 .with_target("cranelift", Level::INFO)
238 .with_target("wasmtime", Level::INFO)
239 .with_target("sqlx", Level::WARN)
240 .with_target("opendal", Level::INFO)
241 .with_target("reqsign", Level::INFO);
242
243 let default_level = match deployment {
245 Deployment::Ci => Level::INFO,
246 _ => {
247 if cfg!(debug_assertions) {
248 Level::DEBUG
249 } else {
250 Level::INFO
251 }
252 }
253 };
254 filter = filter.with_default(default_level);
255
256 filter = filter.with_targets(settings.targets);
258 if let Some(default_level) = settings.default_level {
259 filter = filter.with_default(default_level);
260 }
261
262 if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV)
264 && !rust_log.is_empty()
265 {
266 let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
267 if let Some(default_level) = rust_log_targets.default_level() {
268 filter = filter.with_default(default_level);
269 }
270 filter = filter.with_targets(rust_log_targets)
271 };
272
273 filter
274 };
275
276 let mut layers = vec![];
277
278 {
280 let fmt_layer = tracing_subscriber::fmt::layer()
281 .with_thread_names(settings.with_thread_name)
282 .with_timer(default_timer.clone())
283 .with_ansi(settings.colorful)
284 .with_writer(move || {
285 if settings.stderr {
286 Either::Left(std::io::stderr())
287 } else {
288 Either::Right(std::io::stdout())
289 }
290 });
291
292 let fmt_layer = match deployment {
293 Deployment::Ci => fmt_layer.compact().boxed(),
294 Deployment::Cloud => fmt_layer
295 .json()
296 .map_event_format(|e| e.with_current_span(false)) .boxed(),
298 Deployment::Other => {
299 if env_var_is_true("ENABLE_PRETTY_LOG") {
300 fmt_layer.pretty().boxed()
301 } else {
302 fmt_layer.boxed()
303 }
304 }
305 };
306
307 layers.push(
308 fmt_layer
309 .with_filter(default_filter.clone().with_target("rw_tracing", Level::OFF)) .boxed(),
311 );
312 };
313
314 let query_log_path = std::env::var("RW_QUERY_LOG_PATH");
316 if let Ok(query_log_path) = query_log_path {
317 let query_log_path = PathBuf::from(query_log_path);
318 std::fs::create_dir_all(query_log_path.clone()).unwrap_or_else(|e| {
319 panic!(
320 "failed to create directory '{}' for query log: {}",
321 query_log_path.display(),
322 e.as_report(),
323 )
324 });
325
326 #[derive(Default)]
341 struct FmtFields<const SLOW: bool>(DefaultFields);
342
343 impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields<SLOW> {
344 fn format_fields<R: tracing_subscriber::field::RecordFields>(
345 &self,
346 writer: tracing_subscriber::fmt::format::Writer<'writer>,
347 fields: R,
348 ) -> std::fmt::Result {
349 self.0.format_fields(writer, fields)
350 }
351 }
352
353 for (file_name, target, is_slow) in [
354 ("query.log", PGWIRE_QUERY_LOG, false),
355 ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true),
356 ] {
357 let path = query_log_path.join(file_name);
358
359 let file = std::fs::OpenOptions::new()
360 .create(true)
361 .write(true)
362 .truncate(true)
363 .open(&path)
364 .unwrap_or_else(|e| {
365 panic!("failed to create `{}`: {}", path.display(), e.as_report(),)
366 });
367
368 let layer = tracing_subscriber::fmt::layer()
369 .with_ansi(false)
370 .with_level(false)
371 .with_file(false)
372 .with_target(false)
373 .with_timer(default_timer.clone())
374 .with_thread_names(true)
375 .with_thread_ids(true)
376 .with_writer(file);
377
378 let layer = match is_slow {
379 true => layer.fmt_fields(FmtFields::<true>::default()).boxed(),
380 false => layer.fmt_fields(FmtFields::<false>::default()).boxed(),
381 };
382
383 let layer = layer.with_filter(
384 filter::Targets::new()
385 .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO)
387 .with_target(target, Level::INFO),
388 );
389
390 layers.push(layer.boxed());
391 }
392 }
393
394 if settings.enable_tokio_console {
395 let (console_layer, server) = console_subscriber::ConsoleLayer::builder()
396 .with_default_env()
397 .build();
398 let console_layer = console_layer.with_filter(
399 filter::Targets::new()
400 .with_target("tokio", Level::TRACE)
401 .with_target("runtime", Level::TRACE),
402 );
403 layers.push(console_layer.boxed());
404 std::thread::spawn(|| {
405 tokio::runtime::Builder::new_current_thread()
406 .enable_all()
407 .build()
408 .unwrap()
409 .block_on(async move {
410 println!("serving console subscriber");
411 server.serve().await.unwrap();
412 });
413 });
414 };
415
416 #[cfg(not(madsim))]
418 if let Some(endpoint) = settings.tracing_endpoint {
419 println!("opentelemetry tracing will be exported to `{endpoint}` if enabled");
420
421 use opentelemetry::KeyValue;
422 use opentelemetry_otlp::WithExportConfig;
423 use opentelemetry_semantic_conventions::resource;
424
425 let id = format!(
426 "{}-{}",
427 hostname::get()
428 .ok()
429 .and_then(|o| o.into_string().ok())
430 .unwrap_or_default(),
431 std::process::id()
432 );
433
434 let (otel_tracer, exporter) = {
435 let runtime = tokio::runtime::Builder::new_multi_thread()
436 .enable_all()
437 .thread_name("rw-otel")
438 .worker_threads(2)
439 .build()
440 .unwrap();
441 let runtime = Box::leak(Box::new(runtime));
442
443 let _entered = runtime.enter();
445
446 let service_name = format!("{}-{}", settings.name, id);
449 let otel_tracer = TracerProviderBuilder::default()
450 .with_batch_exporter(
451 SpanExporter::builder()
452 .with_tonic()
453 .with_endpoint(&endpoint)
454 .build()
455 .unwrap(),
456 )
457 .with_resource(
458 Resource::builder()
459 .with_attributes([
460 KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
461 KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
462 KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
463 KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
464 ])
465 .build(),
466 )
467 .build()
468 .tracer(service_name);
469
470 let exporter = SpanExporter::builder()
471 .with_tonic()
472 .with_endpoint(&endpoint)
473 .with_protocol(opentelemetry_otlp::Protocol::Grpc)
474 .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
475 .build()
476 .unwrap();
477
478 (otel_tracer, exporter)
479 };
480
481 let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());
485
486 set_toggle_otel_layer_fn(move |enabled: bool| {
487 let result = reload_handle.modify(|f| {
488 *f = if enabled {
489 default_filter.clone()
490 } else {
491 disabled_filter()
492 }
493 });
494
495 match result {
496 Ok(_) => tracing::info!(
497 "opentelemetry tracing {}",
498 if enabled { "enabled" } else { "disabled" },
499 ),
500
501 Err(error) => tracing::error!(
502 error = %error.as_report(),
503 "failed to {} opentelemetry tracing",
504 if enabled { "enable" } else { "disable" },
505 ),
506 }
507 });
508
509 let layer = tracing_opentelemetry::layer()
510 .with_tracer(otel_tracer)
511 .with_filter(reload_filter);
512
513 layers.push(layer.boxed());
514
515 let reporter = OpenTelemetryReporter::new(
525 exporter,
526 SpanKind::Server,
527 Cow::Owned(
528 Resource::builder()
529 .with_service_name(format!("fastrace-{id}"))
530 .build(),
531 ),
532 InstrumentationScope::builder("opentelemetry-instrumentation-foyer").build(),
533 );
534 fastrace::set_reporter(reporter, fastrace::collector::Config::default());
535 tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
536 }
537
538 {
540 let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG);
541
542 layers.push(Box::new(MetricsLayer::new().with_filter(filter)));
543 }
544 tracing_subscriber::registry().with(layers).init();
545 }