risingwave_rt/
logger.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::env;
17use std::path::PathBuf;
18
19use either::Either;
20use fastrace_opentelemetry::OpenTelemetryReporter;
21use opentelemetry::InstrumentationScope;
22use opentelemetry::trace::TracerProvider;
23use opentelemetry_otlp::SpanExporter;
24use opentelemetry_sdk::Resource;
25use opentelemetry_sdk::trace::TracerProviderBuilder;
26use risingwave_common::metrics::MetricsLayer;
27use risingwave_common::util::deployment::Deployment;
28use risingwave_common::util::env_var::env_var_is_true;
29use risingwave_common::util::query_log::*;
30use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
31use thiserror_ext::AsReport;
32use tracing::level_filters::LevelFilter as Level;
33use tracing_subscriber::filter::Targets;
34use tracing_subscriber::fmt::FormatFields;
35use tracing_subscriber::fmt::format::DefaultFields;
36use tracing_subscriber::fmt::time::OffsetTime;
37use tracing_subscriber::layer::SubscriberExt;
38use tracing_subscriber::prelude::*;
39use tracing_subscriber::{EnvFilter, filter, reload};
40
41/// Parse a comma-separated list of `key=value` pairs into `OpenTelemetry` `KeyValue` attributes.
42///
43/// - Splits by comma to get pairs.
44/// - Each pair must contain exactly one `=`.
45/// - Trims whitespace around key and value.
46/// - Ignores invalid pairs but logs a warning.
47/// - Returns an empty vec for empty or unset input.
48fn parse_extra_tracing_attributes(input: &str) -> Vec<opentelemetry::KeyValue> {
49    use opentelemetry::KeyValue;
50
51    if input.trim().is_empty() {
52        return vec![];
53    }
54
55    input
56        .split(',')
57        .filter_map(|pair| {
58            let pair = pair.trim();
59            if pair.is_empty() {
60                return None;
61            }
62            let parts: Vec<&str> = pair.splitn(2, '=').collect();
63            if parts.len() != 2 {
64                eprintln!("warning: ignoring invalid tracing attribute pair (no '='): {pair:?}");
65                return None;
66            }
67            let key = parts[0].trim();
68            let value = parts[1].trim();
69            if key.is_empty() {
70                eprintln!("warning: ignoring tracing attribute with empty key: {pair:?}");
71                return None;
72            }
73            Some(KeyValue::new(key.to_owned(), value.to_owned()))
74        })
75        .collect()
76}
77
78pub struct LoggerSettings {
79    /// The name of the service. Used to identify the service in distributed tracing.
80    name: String,
81    /// Enable tokio console output.
82    enable_tokio_console: bool,
83    /// Enable colorful output in console.
84    colorful: bool,
85    /// Output to `stderr` instead of `stdout`.
86    stderr: bool,
87    /// Whether to include thread name in the log.
88    with_thread_name: bool,
89    /// Override target settings.
90    targets: Vec<(String, tracing::metadata::LevelFilter)>,
91    /// Override the default level.
92    default_level: Option<tracing::metadata::LevelFilter>,
93    /// The endpoint of the tracing collector in OTLP gRPC protocol.
94    tracing_endpoint: Option<String>,
95    /// Extra key/value attributes to attach to `OpenTelemetry` trace resources.
96    extra_tracing_attributes: Vec<opentelemetry::KeyValue>,
97}
98
99impl Default for LoggerSettings {
100    fn default() -> Self {
101        Self::new("risingwave")
102    }
103}
104
105impl LoggerSettings {
106    /// Create a new logger settings from the given command-line options.
107    ///
108    /// If env var `RW_TRACING_ENDPOINT` is not set, the meta address will be used
109    /// as the default tracing endpoint, which means that the embedded tracing
110    /// collector will be used.
111    pub fn from_opts<O: risingwave_common::opts::Opts>(opts: &O) -> Self {
112        let mut settings = Self::new(O::name());
113        if settings.tracing_endpoint.is_none() // no explicit endpoint
114            && let Some(addr) = opts.meta_addr().exactly_one()
115        // meta address is valid
116        {
117            // Use embedded collector in the meta service.
118            // TODO: when there's multiple meta nodes for high availability, we may send
119            // to a wrong node here.
120            settings.tracing_endpoint = Some(addr.to_string());
121        }
122        settings
123    }
124
125    /// Create a new logger settings with the given service name.
126    pub fn new(name: impl Into<String>) -> Self {
127        Self {
128            name: name.into(),
129            enable_tokio_console: false,
130            colorful: console::colors_enabled_stderr() && console::colors_enabled(),
131            stderr: false,
132            with_thread_name: false,
133            targets: vec![],
134            default_level: None,
135            tracing_endpoint: std::env::var("RW_TRACING_ENDPOINT").ok(),
136            extra_tracing_attributes: std::env::var("RW_TRACING_EXTRA_ATTRIBUTES")
137                .ok()
138                .map(|v| parse_extra_tracing_attributes(&v))
139                .unwrap_or_default(),
140        }
141    }
142
143    /// Enable tokio console output.
144    pub fn tokio_console(mut self, enabled: bool) -> Self {
145        self.enable_tokio_console = enabled;
146        self
147    }
148
149    /// Output to `stderr` instead of `stdout`.
150    pub fn stderr(mut self, enabled: bool) -> Self {
151        self.stderr = enabled;
152        self
153    }
154
155    /// Whether to include thread name in the log.
156    pub fn with_thread_name(mut self, enabled: bool) -> Self {
157        self.with_thread_name = enabled;
158        self
159    }
160
161    /// Overrides the default target settings.
162    pub fn with_target(
163        mut self,
164        target: impl Into<String>,
165        level: impl Into<tracing::metadata::LevelFilter>,
166    ) -> Self {
167        self.targets.push((target.into(), level.into()));
168        self
169    }
170
171    /// Overrides the default level.
172    pub fn with_default(mut self, level: impl Into<tracing::metadata::LevelFilter>) -> Self {
173        self.default_level = Some(level.into());
174        self
175    }
176
177    /// Overrides the tracing endpoint.
178    pub fn with_tracing_endpoint(mut self, endpoint: impl Into<String>) -> Self {
179        self.tracing_endpoint = Some(endpoint.into());
180        self
181    }
182}
183
184/// Create a filter that disables all events or spans.
185fn disabled_filter() -> filter::Targets {
186    filter::Targets::new()
187}
188
189/// Init logger for RisingWave binaries.
190///
191/// ## Environment variables to configure logger dynamically
192///
193/// ### `RUST_LOG`
194///
195/// Overrides default level and tracing targets of the fmt layer (formatting and
196/// logging to `stdout` or `stderr`).
197///
198/// Note that only verbosity levels below or equal to `DEBUG` are effective in
199/// release builds.
200///
201/// e.g.,
202/// ```bash
203/// RUST_LOG="info,risingwave_stream=debug,events=debug"
204/// ```
205///
206/// ### `RW_QUERY_LOG_PATH`
207///
208/// Configures the path to generate query log.
209///
210/// If it is set,
211/// - Dump logs of all SQLs, i.e., tracing target [`PGWIRE_QUERY_LOG`] to
212///   `RW_QUERY_LOG_PATH/query.log`.
213/// - Dump slow queries, i.e., tracing target [`PGWIRE_SLOW_QUERY_LOG`] to
214///   `RW_QUERY_LOG_PATH/slow_query.log`.
215///
216/// Note:
217/// To enable query log in the fmt layer (slow query is included by default), set
218/// ```bash
219/// RUST_LOG="pgwire_query_log=info"
220/// ```
221///
222/// `RW_QUERY_LOG_TRUNCATE_LEN` configures the max length of the SQLs logged in the query log,
223/// to avoid the log file growing too large. The default value is 1024 in production.
224///
225/// ### `ENABLE_PRETTY_LOG`
226///
227/// If it is set to `true`, enable pretty log output, which contains line numbers and prints spans in multiple lines.
228/// This can be helpful for development and debugging.
229///
230/// Hint: Also turn off other uninteresting logs to make the most of the pretty log.
231/// e.g.,
232/// ```bash
233/// RUST_LOG="risingwave_storage::hummock::event_handler=off,batch_execute=off,risingwave_batch::task=off" ENABLE_PRETTY_LOG=true risedev d
234/// ```
235pub fn init_risingwave_logger(settings: LoggerSettings) {
236    let deployment = Deployment::current();
237
238    // Default timer for logging with local time offset.
239    let default_timer = OffsetTime::local_rfc_3339().unwrap_or_else(|e| {
240        println!(
241            "failed to get local time offset, falling back to UTC: {}",
242            e.as_report()
243        );
244        OffsetTime::new(
245            time::UtcOffset::UTC,
246            time::format_description::well_known::Rfc3339,
247        )
248    });
249
250    // Default filter for logging to stdout and tracing.
251    let default_filter = {
252        let mut filter = filter::Targets::new();
253
254        // Configure levels for some RisingWave crates. Can still be overridden by `RUST_LOG`.
255        // Other RisingWave crates like `stream` and `storage` will follow the default level.
256        filter = filter
257            .with_target("auto_schema_change", Level::INFO)
258            .with_target("risingwave_sqlparser", Level::INFO)
259            .with_target("risingwave_connector_node", Level::INFO)
260            .with_target("pgwire", Level::INFO)
261            .with_target(PGWIRE_QUERY_LOG, Level::OFF)
262            // debug-purposed events are disabled unless `RUST_LOG` overrides
263            .with_target("events", Level::OFF);
264
265        // Configure levels for external crates. Can still be overridden by `RUST_LOG`.
266        // Other external crates will follow the default level.
267        filter = filter
268            .with_target("foyer", Level::INFO)
269            .with_target("aws", Level::INFO)
270            .with_target("aws_config", Level::WARN)
271            .with_target("aws_endpoint", Level::WARN)
272            .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)
273            .with_target("hyper", Level::WARN)
274            .with_target("h2", Level::WARN)
275            .with_target("tower", Level::WARN)
276            .with_target("tonic", Level::WARN)
277            .with_target("isahc", Level::WARN)
278            .with_target("console_subscriber", Level::WARN)
279            .with_target("reqwest", Level::WARN)
280            .with_target("sled", Level::INFO)
281            .with_target("cranelift", Level::INFO)
282            .with_target("wasmtime", Level::INFO)
283            .with_target("sqlx", Level::WARN)
284            .with_target("opendal", Level::INFO)
285            .with_target("reqsign", Level::INFO)
286            .with_target("jni", Level::INFO)
287            .with_target("async_nats", Level::WARN);
288
289        // For all other crates, apply default level depending on the deployment and `debug_assertions` flag.
290        let default_level = match deployment {
291            Deployment::Ci => Level::INFO,
292            _ => {
293                if cfg!(debug_assertions) {
294                    Level::DEBUG
295                } else {
296                    Level::INFO
297                }
298            }
299        };
300        filter = filter.with_default(default_level);
301
302        // Overrides from settings.
303        filter = filter.with_targets(settings.targets);
304        if let Some(default_level) = settings.default_level {
305            filter = filter.with_default(default_level);
306        }
307
308        // Overrides from env var.
309        if let Ok(rust_log) = std::env::var(EnvFilter::DEFAULT_ENV)
310            && !rust_log.is_empty()
311        {
312            let rust_log_targets: Targets = rust_log.parse().expect("failed to parse `RUST_LOG`");
313            if let Some(default_level) = rust_log_targets.default_level() {
314                filter = filter.with_default(default_level);
315            }
316            filter = filter.with_targets(rust_log_targets)
317        };
318
319        filter
320    };
321
322    let mut layers = vec![];
323
324    // fmt layer (formatting and logging to `stdout` or `stderr`)
325    {
326        let fmt_layer = tracing_subscriber::fmt::layer()
327            .with_thread_names(settings.with_thread_name)
328            .with_timer(default_timer.clone())
329            .with_ansi(settings.colorful)
330            .with_writer(move || {
331                if settings.stderr {
332                    Either::Left(std::io::stderr())
333                } else {
334                    Either::Right(std::io::stdout())
335                }
336            });
337
338        let fmt_layer = match deployment {
339            Deployment::Ci => fmt_layer.compact().boxed(),
340            Deployment::Cloud => fmt_layer
341                .json()
342                .map_event_format(|e| e.with_current_span(false)) // avoid duplication as there's a span list field
343                .boxed(),
344            Deployment::Other => {
345                if env_var_is_true("ENABLE_PRETTY_LOG") {
346                    fmt_layer.pretty().boxed()
347                } else {
348                    fmt_layer.boxed()
349                }
350            }
351        };
352
353        layers.push(
354            fmt_layer
355                .with_filter(default_filter.clone().with_target("rw_tracing", Level::OFF)) // filter-out tracing-only events
356                .boxed(),
357        );
358    };
359
360    // If `RW_QUERY_LOG_PATH` env var is set to a directory, turn on query log files.
361    let query_log_path = std::env::var("RW_QUERY_LOG_PATH");
362    if let Ok(query_log_path) = query_log_path {
363        let query_log_path = PathBuf::from(query_log_path);
364        std::fs::create_dir_all(query_log_path.clone()).unwrap_or_else(|e| {
365            panic!(
366                "failed to create directory '{}' for query log: {}",
367                query_log_path.display(),
368                e.as_report(),
369            )
370        });
371
372        /// Newtype wrapper for `DefaultFields`.
373        ///
374        /// `fmt::Layer` will share the same `FormattedFields` extension for spans across
375        /// different layers, as long as the type of `N: FormatFields` is the same. This
376        /// will cause several problems:
377        ///
378        /// - `with_ansi(false)` does not take effect and it will follow the settings of
379        ///   the primary fmt layer installed above.
380        /// - `Span::record` will update the same `FormattedFields` multiple times,
381        ///   leading to duplicated fields.
382        ///
383        /// As a workaround, we use a newtype wrapper here to get a different type id.
384        /// The const generic parameter `SLOW` is further used to distinguish between the
385        /// query log and the slow query log.
386        #[derive(Default)]
387        struct FmtFields<const SLOW: bool>(DefaultFields);
388
389        impl<'writer, const SLOW: bool> FormatFields<'writer> for FmtFields<SLOW> {
390            fn format_fields<R: tracing_subscriber::field::RecordFields>(
391                &self,
392                writer: tracing_subscriber::fmt::format::Writer<'writer>,
393                fields: R,
394            ) -> std::fmt::Result {
395                self.0.format_fields(writer, fields)
396            }
397        }
398
399        for (file_name, target, is_slow) in [
400            ("query.log", PGWIRE_QUERY_LOG, false),
401            ("slow_query.log", PGWIRE_SLOW_QUERY_LOG, true),
402        ] {
403            let path = query_log_path.join(file_name);
404
405            let file = std::fs::OpenOptions::new()
406                .create(true)
407                .write(true)
408                .truncate(true)
409                .open(&path)
410                .unwrap_or_else(|e| {
411                    panic!("failed to create `{}`: {}", path.display(), e.as_report(),)
412                });
413
414            let layer = tracing_subscriber::fmt::layer()
415                .with_ansi(false)
416                .with_level(false)
417                .with_file(false)
418                .with_target(false)
419                .with_timer(default_timer.clone())
420                .with_thread_names(true)
421                .with_thread_ids(true)
422                .with_writer(file);
423
424            let layer = match is_slow {
425                true => layer.fmt_fields(FmtFields::<true>::default()).boxed(),
426                false => layer.fmt_fields(FmtFields::<false>::default()).boxed(),
427            };
428
429            let layer = layer.with_filter(
430                filter::Targets::new()
431                    // Root span must be enabled to provide common info like the SQL query.
432                    .with_target(PGWIRE_ROOT_SPAN_TARGET, Level::INFO)
433                    .with_target(target, Level::INFO),
434            );
435
436            layers.push(layer.boxed());
437        }
438    }
439
440    if settings.enable_tokio_console {
441        let (console_layer, server) = console_subscriber::ConsoleLayer::builder()
442            .with_default_env()
443            .build();
444        let console_layer = console_layer.with_filter(
445            filter::Targets::new()
446                .with_target("tokio", Level::TRACE)
447                .with_target("runtime", Level::TRACE),
448        );
449        layers.push(console_layer.boxed());
450        std::thread::spawn(|| {
451            tokio::runtime::Builder::new_current_thread()
452                .enable_all()
453                .build()
454                .unwrap()
455                .block_on(async move {
456                    println!("serving console subscriber");
457                    server.serve().await.unwrap();
458                });
459        });
460    };
461
462    // Tracing layer
463    #[cfg(not(madsim))]
464    if let Some(endpoint) = settings.tracing_endpoint {
465        println!("opentelemetry tracing will be exported to `{endpoint}` if enabled");
466
467        use opentelemetry::KeyValue;
468        use opentelemetry_otlp::WithExportConfig;
469        use opentelemetry_semantic_conventions::resource;
470
471        let id = format!(
472            "{}-{}",
473            hostname::get()
474                .ok()
475                .and_then(|o| o.into_string().ok())
476                .unwrap_or_default(),
477            std::process::id()
478        );
479
480        let extra_attributes = &settings.extra_tracing_attributes;
481        if !extra_attributes.is_empty() {
482            println!(
483                "extra tracing resource attributes: {:?}",
484                extra_attributes
485                    .iter()
486                    .map(|kv| format!("{}={}", kv.key, kv.value))
487                    .collect::<Vec<_>>()
488            );
489        }
490
491        let (otel_tracer, exporter) = {
492            let runtime = tokio::runtime::Builder::new_multi_thread()
493                .enable_all()
494                .thread_name("rw-otel")
495                .worker_threads(2)
496                .build()
497                .unwrap();
498            let runtime = Box::leak(Box::new(runtime));
499
500            // Installing the exporter requires a tokio runtime.
501            let _entered = runtime.enter();
502
503            // TODO(bugen): better service name
504            // https://github.com/jaegertracing/jaeger-ui/issues/336
505            let service_name = format!("{}-{}", settings.name, id);
506
507            let mut resource_attrs = vec![
508                KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
509                KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
510                KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
511                KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
512            ];
513            resource_attrs.extend(extra_attributes.iter().cloned());
514
515            let otel_tracer = TracerProviderBuilder::default()
516                .with_batch_exporter(
517                    SpanExporter::builder()
518                        .with_tonic()
519                        .with_endpoint(&endpoint)
520                        .build()
521                        .unwrap(),
522                )
523                .with_resource(Resource::builder().with_attributes(resource_attrs).build())
524                .build()
525                .tracer(service_name);
526
527            let exporter = SpanExporter::builder()
528                .with_tonic()
529                .with_endpoint(&endpoint)
530                .with_protocol(opentelemetry_otlp::Protocol::Grpc)
531                .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
532                .build()
533                .unwrap();
534
535            (otel_tracer, exporter)
536        };
537
538        // Disable by filtering out all events or spans by default.
539        //
540        // It'll be enabled with `toggle_otel_layer` based on the system parameter `enable_tracing` later.
541        let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());
542
543        set_toggle_otel_layer_fn(move |enabled: bool| {
544            let result = reload_handle.modify(|f| {
545                *f = if enabled {
546                    default_filter.clone()
547                } else {
548                    disabled_filter()
549                }
550            });
551
552            match result {
553                Ok(_) => tracing::info!(
554                    "opentelemetry tracing {}",
555                    if enabled { "enabled" } else { "disabled" },
556                ),
557
558                Err(error) => tracing::error!(
559                    error = %error.as_report(),
560                    "failed to {} opentelemetry tracing",
561                    if enabled { "enable" } else { "disable" },
562                ),
563            }
564        });
565
566        let layer = tracing_opentelemetry::layer()
567            .with_tracer(otel_tracer)
568            .with_filter(reload_filter);
569
570        layers.push(layer.boxed());
571
572        // The reporter is used by fastrace in foyer for dynamically tail-based tracing.
573        //
574        // Code here only setup the OpenTelemetry reporter. To enable/disable the function, please use risectl.
575        //
576        // e.g.
577        //
578        // ```bash
579        // risectl hummock tiered-cache-tracing -h
580        // ```
581        let mut fastrace_resource_attrs: Vec<opentelemetry::KeyValue> =
582            vec![opentelemetry::KeyValue::new(
583                opentelemetry_semantic_conventions::resource::SERVICE_NAME,
584                format!("fastrace-{id}"),
585            )];
586        fastrace_resource_attrs.extend(extra_attributes.iter().cloned());
587
588        let reporter = OpenTelemetryReporter::new(
589            exporter,
590            Cow::Owned(
591                Resource::builder()
592                    .with_attributes(fastrace_resource_attrs)
593                    .build(),
594            ),
595            InstrumentationScope::builder("opentelemetry-instrumentation-foyer").build(),
596        );
597        fastrace::set_reporter(reporter, fastrace::collector::Config::default());
598        tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
599    }
600
601    // Metrics layer
602    {
603        let filter = filter::Targets::new().with_target("aws_smithy_client::retry", Level::DEBUG);
604
605        layers.push(Box::new(MetricsLayer::new().with_filter(filter)));
606    }
607    tracing_subscriber::registry().with(layers).init();
608    // TODO: add file-appender tracing subscriber in the future
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614
615    #[test]
616    fn test_parse_extra_tracing_attributes_normal() {
617        let attrs =
618            parse_extra_tracing_attributes("cluster=prod,region=us-east-1,namespace=terry-dev");
619        assert_eq!(attrs.len(), 3);
620        assert_eq!(attrs[0].key.as_str(), "cluster");
621        assert_eq!(attrs[0].value.as_str(), "prod");
622        assert_eq!(attrs[1].key.as_str(), "region");
623        assert_eq!(attrs[1].value.as_str(), "us-east-1");
624        assert_eq!(attrs[2].key.as_str(), "namespace");
625        assert_eq!(attrs[2].value.as_str(), "terry-dev");
626    }
627
628    #[test]
629    fn test_parse_extra_tracing_attributes_whitespace() {
630        let attrs = parse_extra_tracing_attributes("  key1 = val1 , key2=val2  ");
631        assert_eq!(attrs.len(), 2);
632        assert_eq!(attrs[0].key.as_str(), "key1");
633        assert_eq!(attrs[0].value.as_str(), "val1");
634        assert_eq!(attrs[1].key.as_str(), "key2");
635        assert_eq!(attrs[1].value.as_str(), "val2");
636    }
637
638    #[test]
639    fn test_parse_extra_tracing_attributes_empty() {
640        assert!(parse_extra_tracing_attributes("").is_empty());
641        assert!(parse_extra_tracing_attributes("  ").is_empty());
642    }
643
644    #[test]
645    fn test_parse_extra_tracing_attributes_invalid_pairs() {
646        // No '=' sign — should be skipped
647        let attrs = parse_extra_tracing_attributes("good=value,badpair,also_good=123");
648        assert_eq!(attrs.len(), 2);
649        assert_eq!(attrs[0].key.as_str(), "good");
650        assert_eq!(attrs[1].key.as_str(), "also_good");
651    }
652
653    #[test]
654    fn test_parse_extra_tracing_attributes_empty_key() {
655        // "=value" has empty key — should be skipped
656        let attrs = parse_extra_tracing_attributes("=value,key=val");
657        assert_eq!(attrs.len(), 1);
658        assert_eq!(attrs[0].key.as_str(), "key");
659    }
660
661    #[test]
662    fn test_parse_extra_tracing_attributes_value_with_equals() {
663        // value itself contains '=' — only first '=' is the delimiter
664        let attrs = parse_extra_tracing_attributes("expr=a=b");
665        assert_eq!(attrs.len(), 1);
666        assert_eq!(attrs[0].key.as_str(), "expr");
667        assert_eq!(attrs[0].value.as_str(), "a=b");
668    }
669
670    #[test]
671    fn test_parse_extra_tracing_attributes_trailing_comma() {
672        let attrs = parse_extra_tracing_attributes("k1=v1,k2=v2,");
673        assert_eq!(attrs.len(), 2);
674    }
675
676    #[test]
677    fn test_parse_extra_tracing_attributes_empty_value() {
678        let attrs = parse_extra_tracing_attributes("key=");
679        assert_eq!(attrs.len(), 1);
680        assert_eq!(attrs[0].key.as_str(), "key");
681        assert_eq!(attrs[0].value.as_str(), "");
682    }
683}