risingwave_connector/source/kafka/
stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use prometheus::Registry;
16use rdkafka::Statistics;
17use rdkafka::statistics::{Broker, ConsumerGroup, Partition, Topic, Window};
18use risingwave_common::metrics::{LabelGuardedIntGaugeVec, LabelGuardedUintGaugeVec};
19use risingwave_common::{
20    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
21};
22
23#[derive(Debug, Clone)]
24pub struct RdKafkaStats {
25    pub registry: Registry,
26
27    pub ts: LabelGuardedIntGaugeVec<2>,
28    pub time: LabelGuardedIntGaugeVec<2>,
29    pub age: LabelGuardedIntGaugeVec<2>,
30    pub replyq: LabelGuardedIntGaugeVec<2>,
31    pub msg_cnt: LabelGuardedUintGaugeVec<2>,
32    pub msg_size: LabelGuardedUintGaugeVec<2>,
33    pub msg_max: LabelGuardedUintGaugeVec<2>,
34    pub msg_size_max: LabelGuardedUintGaugeVec<2>,
35    pub tx: LabelGuardedIntGaugeVec<2>,
36    pub tx_bytes: LabelGuardedIntGaugeVec<2>,
37    pub rx: LabelGuardedIntGaugeVec<2>,
38    pub rx_bytes: LabelGuardedIntGaugeVec<2>,
39    pub tx_msgs: LabelGuardedIntGaugeVec<2>,
40    pub tx_msgs_bytes: LabelGuardedIntGaugeVec<2>,
41    pub rx_msgs: LabelGuardedIntGaugeVec<2>,
42    pub rx_msgs_bytes: LabelGuardedIntGaugeVec<2>,
43    pub simple_cnt: LabelGuardedIntGaugeVec<2>,
44    pub metadata_cache_cnt: LabelGuardedIntGaugeVec<2>,
45
46    pub broker_stats: BrokerStats,
47    pub topic_stats: TopicStats,
48    pub cgrp: ConsumerGroupStats,
49}
50
51#[derive(Debug, Clone)]
52pub struct BrokerStats {
53    pub registry: Registry,
54
55    pub state_age: LabelGuardedIntGaugeVec<4>,
56    pub outbuf_cnt: LabelGuardedIntGaugeVec<4>,
57    pub outbuf_msg_cnt: LabelGuardedIntGaugeVec<4>,
58    pub waitresp_cnt: LabelGuardedIntGaugeVec<4>,
59    pub waitresp_msg_cnt: LabelGuardedIntGaugeVec<4>,
60    pub tx: LabelGuardedUintGaugeVec<4>,
61    pub tx_bytes: LabelGuardedUintGaugeVec<4>,
62    pub tx_errs: LabelGuardedUintGaugeVec<4>,
63    pub tx_retries: LabelGuardedUintGaugeVec<4>,
64    pub tx_idle: LabelGuardedIntGaugeVec<4>,
65    pub req_timeouts: LabelGuardedUintGaugeVec<4>,
66    pub rx: LabelGuardedUintGaugeVec<4>,
67    pub rx_bytes: LabelGuardedUintGaugeVec<4>,
68    pub rx_errs: LabelGuardedUintGaugeVec<4>,
69    pub rx_corriderrs: LabelGuardedUintGaugeVec<4>,
70    pub rx_partial: LabelGuardedUintGaugeVec<4>,
71    pub rx_idle: LabelGuardedIntGaugeVec<4>,
72    pub req: LabelGuardedIntGaugeVec<5>,
73    pub zbuf_grow: LabelGuardedUintGaugeVec<4>,
74    pub buf_grow: LabelGuardedUintGaugeVec<4>,
75    pub wakeups: LabelGuardedUintGaugeVec<4>,
76    pub connects: LabelGuardedIntGaugeVec<4>,
77    pub disconnects: LabelGuardedIntGaugeVec<4>,
78    pub int_latency: StatsWindow,
79    pub outbuf_latency: StatsWindow,
80    pub rtt: StatsWindow,
81    pub throttle: StatsWindow,
82}
83
84#[derive(Debug, Clone)]
85pub struct TopicStats {
86    pub registry: Registry,
87
88    pub metadata_age: LabelGuardedIntGaugeVec<3>,
89    pub batch_size: StatsWindow,
90    pub batch_cnt: StatsWindow,
91    pub partitions: PartitionStats,
92}
93
94#[derive(Debug, Clone)]
95pub struct StatsWindow {
96    pub registry: Registry,
97
98    pub min: LabelGuardedIntGaugeVec<4>,
99    pub max: LabelGuardedIntGaugeVec<4>,
100    pub avg: LabelGuardedIntGaugeVec<4>,
101    pub sum: LabelGuardedIntGaugeVec<4>,
102    pub cnt: LabelGuardedIntGaugeVec<4>,
103    pub stddev: LabelGuardedIntGaugeVec<4>,
104    pub hdr_size: LabelGuardedIntGaugeVec<4>,
105    pub p50: LabelGuardedIntGaugeVec<4>,
106    pub p75: LabelGuardedIntGaugeVec<4>,
107    pub p90: LabelGuardedIntGaugeVec<4>,
108    pub p95: LabelGuardedIntGaugeVec<4>,
109    pub p99: LabelGuardedIntGaugeVec<4>,
110    pub p99_99: LabelGuardedIntGaugeVec<4>,
111    pub out_of_range: LabelGuardedIntGaugeVec<4>,
112}
113
114#[derive(Debug, Clone)]
115pub struct ConsumerGroupStats {
116    pub registry: Registry,
117
118    pub state_age: LabelGuardedIntGaugeVec<3>,
119    // todo: (do not know value set) join_state: IntGaugeVec,
120    pub rebalance_age: LabelGuardedIntGaugeVec<3>,
121    pub rebalance_cnt: LabelGuardedIntGaugeVec<3>,
122    // todo: (cannot handle string) rebalance_reason,
123    pub assignment_size: LabelGuardedIntGaugeVec<3>,
124}
125
126impl ConsumerGroupStats {
127    pub fn new(registry: Registry) -> Self {
128        let state_age = register_guarded_int_gauge_vec_with_registry!(
129            "rdkafka_consumer_group_state_age",
130            "Age of the consumer group state in seconds",
131            &["id", "client_id", "state"],
132            registry
133        )
134        .unwrap();
135        let rebalance_age = register_guarded_int_gauge_vec_with_registry!(
136            "rdkafka_consumer_group_rebalance_age",
137            "Age of the last rebalance in seconds",
138            &["id", "client_id", "state"],
139            registry
140        )
141        .unwrap();
142        let rebalance_cnt = register_guarded_int_gauge_vec_with_registry!(
143            "rdkafka_consumer_group_rebalance_cnt",
144            "Number of rebalances",
145            &["id", "client_id", "state"],
146            registry
147        )
148        .unwrap();
149        let assignment_size = register_guarded_int_gauge_vec_with_registry!(
150            "rdkafka_consumer_group_assignment_size",
151            "Number of assigned partitions",
152            &["id", "client_id", "state"],
153            registry
154        )
155        .unwrap();
156
157        Self {
158            registry,
159            state_age,
160            rebalance_age,
161            rebalance_cnt,
162            assignment_size,
163        }
164    }
165
166    pub fn report(&self, id: &str, client_id: &str, stats: &ConsumerGroup) {
167        let state = stats.state.as_str();
168        self.state_age
169            .with_guarded_label_values(&[id, client_id, state])
170            .set(stats.stateage);
171        self.rebalance_age
172            .with_guarded_label_values(&[id, client_id, state])
173            .set(stats.rebalance_age);
174        self.rebalance_cnt
175            .with_guarded_label_values(&[id, client_id, state])
176            .set(stats.rebalance_cnt);
177        self.assignment_size
178            .with_guarded_label_values(&[id, client_id, state])
179            .set(stats.assignment_size as i64);
180    }
181}
182
183impl StatsWindow {
184    pub fn new(registry: Registry, path: &str) -> Self {
185        let get_metric_name = |name: &str| format!("rdkafka_{}_{}", path, name);
186        let min = register_guarded_int_gauge_vec_with_registry!(
187            get_metric_name("min"),
188            "Minimum value",
189            &["id", "client_id", "broker", "topic"],
190            registry
191        )
192        .unwrap();
193        let max = register_guarded_int_gauge_vec_with_registry!(
194            get_metric_name("max"),
195            "Maximum value",
196            &["id", "client_id", "broker", "topic"],
197            registry
198        )
199        .unwrap();
200        let avg = register_guarded_int_gauge_vec_with_registry!(
201            get_metric_name("avg"),
202            "Average value",
203            &["id", "client_id", "broker", "topic"],
204            registry
205        )
206        .unwrap();
207        let sum = register_guarded_int_gauge_vec_with_registry!(
208            get_metric_name("sum"),
209            "Sum of values",
210            &["id", "client_id", "broker", "topic"],
211            registry
212        )
213        .unwrap();
214        let cnt = register_guarded_int_gauge_vec_with_registry!(
215            get_metric_name("cnt"),
216            "Count of values",
217            &["id", "client_id", "broker", "topic"],
218            registry
219        )
220        .unwrap();
221        let stddev = register_guarded_int_gauge_vec_with_registry!(
222            get_metric_name("stddev"),
223            "Standard deviation",
224            &["id", "client_id", "broker", "topic"],
225            registry
226        )
227        .unwrap();
228        let hdr_size = register_guarded_int_gauge_vec_with_registry!(
229            get_metric_name("hdrsize"),
230            "Size of the histogram header",
231            &["id", "client_id", "broker", "topic"],
232            registry
233        )
234        .unwrap();
235        let p50 = register_guarded_int_gauge_vec_with_registry!(
236            get_metric_name("p50"),
237            "50th percentile",
238            &["id", "client_id", "broker", "topic"],
239            registry
240        )
241        .unwrap();
242        let p75 = register_guarded_int_gauge_vec_with_registry!(
243            get_metric_name("p75"),
244            "75th percentile",
245            &["id", "client_id", "broker", "topic"],
246            registry
247        )
248        .unwrap();
249        let p90 = register_guarded_int_gauge_vec_with_registry!(
250            get_metric_name("p90"),
251            "90th percentile",
252            &["id", "client_id", "broker", "topic"],
253            registry
254        )
255        .unwrap();
256        let p95 = register_guarded_int_gauge_vec_with_registry!(
257            get_metric_name("p95"),
258            "95th percentile",
259            &["id", "client_id", "broker", "topic"],
260            registry
261        )
262        .unwrap();
263        let p99 = register_guarded_int_gauge_vec_with_registry!(
264            get_metric_name("p99"),
265            "99th percentile",
266            &["id", "client_id", "broker", "topic"],
267            registry
268        )
269        .unwrap();
270        let p99_99 = register_guarded_int_gauge_vec_with_registry!(
271            get_metric_name("p99_99"),
272            "99.99th percentile",
273            &["id", "client_id", "broker", "topic"],
274            registry
275        )
276        .unwrap();
277        let out_of_range = register_guarded_int_gauge_vec_with_registry!(
278            get_metric_name("out_of_range"),
279            "Out of range values",
280            &["id", "client_id", "broker", "topic"],
281            registry
282        )
283        .unwrap();
284
285        Self {
286            registry,
287            min,
288            max,
289            avg,
290            sum,
291            cnt,
292            stddev,
293            hdr_size,
294            p50,
295            p75,
296            p90,
297            p95,
298            p99,
299            p99_99,
300            out_of_range,
301        }
302    }
303
304    pub fn report(&self, id: &str, client_id: &str, broker: &str, topic: &str, stats: &Window) {
305        let labels = [id, client_id, broker, topic];
306
307        self.min.with_guarded_label_values(&labels).set(stats.min);
308        self.max.with_guarded_label_values(&labels).set(stats.max);
309        self.avg.with_guarded_label_values(&labels).set(stats.avg);
310        self.sum.with_guarded_label_values(&labels).set(stats.sum);
311        self.cnt.with_guarded_label_values(&labels).set(stats.cnt);
312        self.stddev
313            .with_guarded_label_values(&labels)
314            .set(stats.stddev);
315        self.hdr_size
316            .with_guarded_label_values(&labels)
317            .set(stats.hdrsize);
318        self.p50.with_guarded_label_values(&labels).set(stats.p50);
319        self.p75.with_guarded_label_values(&labels).set(stats.p75);
320        self.p90.with_guarded_label_values(&labels).set(stats.p90);
321        self.p99_99
322            .with_guarded_label_values(&labels)
323            .set(stats.p99_99);
324        self.out_of_range
325            .with_guarded_label_values(&labels)
326            .set(stats.outofrange);
327    }
328}
329
330impl TopicStats {
331    pub fn new(registry: Registry) -> Self {
332        let metadata_age = register_guarded_int_gauge_vec_with_registry!(
333            "rdkafka_topic_metadata_age",
334            "Age of the topic metadata in milliseconds",
335            &["id", "client_id", "topic"],
336            registry
337        )
338        .unwrap();
339        let batch_size = StatsWindow::new(registry.clone(), "topic_batchsize");
340        let batch_cnt = StatsWindow::new(registry.clone(), "topic_batchcnt");
341        let partitions = PartitionStats::new(registry.clone());
342        Self {
343            registry,
344            metadata_age,
345            batch_size,
346            batch_cnt,
347            partitions,
348        }
349    }
350
351    pub fn report(&self, id: &str, client_id: &str, stats: &Statistics) {
352        for (topic, topic_stats) in &stats.topics {
353            self.report_inner(id, client_id, topic, topic_stats);
354        }
355    }
356
357    fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Topic) {
358        self.metadata_age
359            .with_guarded_label_values(&[id, client_id, topic])
360            .set(stats.metadata_age);
361        self.batch_size
362            .report(id, client_id, "", topic, &stats.batchsize);
363        self.batch_cnt
364            .report(id, client_id, "", topic, &stats.batchcnt);
365        self.partitions.report(id, client_id, topic, stats)
366    }
367}
368
369#[derive(Debug, Clone)]
370pub struct PartitionStats {
371    pub registry: Registry,
372
373    pub msgq_cnt: LabelGuardedIntGaugeVec<4>,
374    pub msgq_bytes: LabelGuardedUintGaugeVec<4>,
375    pub xmit_msgq_cnt: LabelGuardedIntGaugeVec<4>,
376    pub xmit_msgq_bytes: LabelGuardedUintGaugeVec<4>,
377    pub fetchq_cnt: LabelGuardedIntGaugeVec<4>,
378    pub fetchq_size: LabelGuardedUintGaugeVec<4>,
379    pub query_offset: LabelGuardedIntGaugeVec<4>,
380    pub next_offset: LabelGuardedIntGaugeVec<4>,
381    pub app_offset: LabelGuardedIntGaugeVec<4>,
382    pub stored_offset: LabelGuardedIntGaugeVec<4>,
383    pub committed_offset: LabelGuardedIntGaugeVec<4>,
384    pub eof_offset: LabelGuardedIntGaugeVec<4>,
385    pub lo_offset: LabelGuardedIntGaugeVec<4>,
386    pub hi_offset: LabelGuardedIntGaugeVec<4>,
387    pub consumer_lag: LabelGuardedIntGaugeVec<4>,
388    pub consumer_lag_store: LabelGuardedIntGaugeVec<4>,
389    pub txmsgs: LabelGuardedUintGaugeVec<4>,
390    pub txbytes: LabelGuardedUintGaugeVec<4>,
391    pub rxmsgs: LabelGuardedUintGaugeVec<4>,
392    pub rxbytes: LabelGuardedUintGaugeVec<4>,
393    pub msgs: LabelGuardedUintGaugeVec<4>,
394    pub rx_ver_drops: LabelGuardedUintGaugeVec<4>,
395    pub msgs_inflight: LabelGuardedIntGaugeVec<4>,
396    pub next_ack_seq: LabelGuardedIntGaugeVec<4>,
397    pub next_err_seq: LabelGuardedIntGaugeVec<4>,
398    pub acked_msgid: LabelGuardedUintGaugeVec<4>,
399}
400
401impl PartitionStats {
402    pub fn new(registry: Registry) -> Self {
403        let msgq_cnt = register_guarded_int_gauge_vec_with_registry!(
404            "rdkafka_topic_partition_msgq_cnt",
405            "Number of messages in the producer queue",
406            &["id", "client_id", "topic", "partition"],
407            registry
408        )
409        .unwrap();
410        let msgq_bytes = register_guarded_uint_gauge_vec_with_registry!(
411            "rdkafka_topic_partition_msgq_bytes",
412            "Size of messages in the producer queue",
413            &["id", "client_id", "topic", "partition"],
414            registry
415        )
416        .unwrap();
417        let xmit_msgq_cnt = register_guarded_int_gauge_vec_with_registry!(
418            "rdkafka_topic_partition_xmit_msgq_cnt",
419            "Number of messages in the transmit queue",
420            &["id", "client_id", "topic", "partition"],
421            registry
422        )
423        .unwrap();
424        let xmit_msgq_bytes = register_guarded_uint_gauge_vec_with_registry!(
425            "rdkafka_topic_partition_xmit_msgq_bytes",
426            "Size of messages in the transmit queue",
427            &["id", "client_id", "topic", "partition"],
428            registry
429        )
430        .unwrap();
431        let fetchq_cnt = register_guarded_int_gauge_vec_with_registry!(
432            "rdkafka_topic_partition_fetchq_cnt",
433            "Number of messages in the fetch queue",
434            &["id", "client_id", "topic", "partition"],
435            registry
436        )
437        .unwrap();
438        let fetchq_size = register_guarded_uint_gauge_vec_with_registry!(
439            "rdkafka_topic_partition_fetchq_size",
440            "Size of messages in the fetch queue",
441            &["id", "client_id", "topic", "partition"],
442            registry
443        )
444        .unwrap();
445        let query_offset = register_guarded_int_gauge_vec_with_registry!(
446            "rdkafka_topic_partition_query_offset",
447            "Current query offset",
448            &["id", "client_id", "topic", "partition"],
449            registry
450        )
451        .unwrap();
452        let next_offset = register_guarded_int_gauge_vec_with_registry!(
453            "rdkafka_topic_partition_next_offset",
454            "Next offset to query",
455            &["id", "client_id", "topic", "partition"],
456            registry
457        )
458        .unwrap();
459        let app_offset = register_guarded_int_gauge_vec_with_registry!(
460            "rdkafka_topic_partition_app_offset",
461            "Last acknowledged offset",
462            &["id", "client_id", "topic", "partition"],
463            registry
464        )
465        .unwrap();
466        let stored_offset = register_guarded_int_gauge_vec_with_registry!(
467            "rdkafka_topic_partition_stored_offset",
468            "Last stored offset",
469            &["id", "client_id", "topic", "partition"],
470            registry
471        )
472        .unwrap();
473        let committed_offset = register_guarded_int_gauge_vec_with_registry!(
474            "rdkafka_topic_partition_committed_offset",
475            "Last committed offset",
476            &["id", "client_id", "topic", "partition"],
477            registry
478        )
479        .unwrap();
480        let eof_offset = register_guarded_int_gauge_vec_with_registry!(
481            "rdkafka_topic_partition_eof_offset",
482            "Last offset in broker log",
483            &["id", "client_id", "topic", "partition"],
484            registry
485        )
486        .unwrap();
487        let lo_offset = register_guarded_int_gauge_vec_with_registry!(
488            "rdkafka_topic_partition_lo_offset",
489            "Low offset",
490            &["id", "client_id", "topic", "partition"],
491            registry
492        )
493        .unwrap();
494        let hi_offset = register_guarded_int_gauge_vec_with_registry!(
495            "rdkafka_topic_partition_hi_offset",
496            "High offset",
497            &["id", "client_id", "topic", "partition"],
498            registry
499        )
500        .unwrap();
501        let consumer_lag = register_guarded_int_gauge_vec_with_registry!(
502            "rdkafka_topic_partition_consumer_lag",
503            "Consumer lag",
504            &["id", "client_id", "topic", "partition"],
505            registry
506        )
507        .unwrap();
508        let consumer_lag_store = register_guarded_int_gauge_vec_with_registry!(
509            "rdkafka_topic_partition_consumer_lag_store",
510            "Consumer lag stored",
511            &["id", "client_id", "topic", "partition"],
512            registry
513        )
514        .unwrap();
515        let txmsgs = register_guarded_uint_gauge_vec_with_registry!(
516            "rdkafka_topic_partition_txmsgs",
517            "Number of transmitted messages",
518            &["id", "client_id", "topic", "partition"],
519            registry
520        )
521        .unwrap();
522        let txbytes = register_guarded_uint_gauge_vec_with_registry!(
523            "rdkafka_topic_partition_txbytes",
524            "Number of transmitted bytes",
525            &["id", "client_id", "topic", "partition"],
526            registry
527        )
528        .unwrap();
529        let rxmsgs = register_guarded_uint_gauge_vec_with_registry!(
530            "rdkafka_topic_partition_rxmsgs",
531            "Number of received messages",
532            &["id", "client_id", "topic", "partition"],
533            registry
534        )
535        .unwrap();
536        let rxbytes = register_guarded_uint_gauge_vec_with_registry!(
537            "rdkafka_topic_partition_rxbytes",
538            "Number of received bytes",
539            &["id", "client_id", "topic", "partition"],
540            registry
541        )
542        .unwrap();
543        let msgs = register_guarded_uint_gauge_vec_with_registry!(
544            "rdkafka_topic_partition_msgs",
545            "Number of messages in partition",
546            &["id", "client_id", "topic", "partition"],
547            registry
548        )
549        .unwrap();
550        let rx_ver_drops = register_guarded_uint_gauge_vec_with_registry!(
551            "rdkafka_topic_partition_rx_ver_drops",
552            "Number of received messages dropped due to version mismatch",
553            &["id", "client_id", "topic", "partition"],
554            registry
555        )
556        .unwrap();
557        let msgs_inflight = register_guarded_int_gauge_vec_with_registry!(
558            "rdkafka_topic_partition_msgs_inflight",
559            "Number of messages in-flight",
560            &["id", "client_id", "topic", "partition"],
561            registry
562        )
563        .unwrap();
564        let next_ack_seq = register_guarded_int_gauge_vec_with_registry!(
565            "rdkafka_topic_partition_next_ack_seq",
566            "Next ack sequence number",
567            &["id", "client_id", "topic", "partition"],
568            registry
569        )
570        .unwrap();
571        let next_err_seq = register_guarded_int_gauge_vec_with_registry!(
572            "rdkafka_topic_partition_next_err_seq",
573            "Next error sequence number",
574            &["id", "client_id", "topic", "partition"],
575            registry
576        )
577        .unwrap();
578        let acked_msgid = register_guarded_uint_gauge_vec_with_registry!(
579            "rdkafka_topic_partition_acked_msgid",
580            "Acknowledged message ID",
581            &["id", "client_id", "topic", "partition"],
582            registry
583        )
584        .unwrap();
585
586        Self {
587            registry,
588            msgq_cnt,
589            msgq_bytes,
590            xmit_msgq_cnt,
591            xmit_msgq_bytes,
592            fetchq_cnt,
593            fetchq_size,
594            query_offset,
595            next_offset,
596            app_offset,
597            stored_offset,
598            committed_offset,
599            eof_offset,
600            lo_offset,
601            hi_offset,
602            consumer_lag,
603            consumer_lag_store,
604            txmsgs,
605            txbytes,
606            rxmsgs,
607            rxbytes,
608            msgs,
609            rx_ver_drops,
610            msgs_inflight,
611            next_ack_seq,
612            next_err_seq,
613            acked_msgid,
614        }
615    }
616
617    pub fn report(&self, id: &str, client_id: &str, topic: &str, stats: &Topic) {
618        for partition_stats in stats.partitions.values() {
619            self.report_inner(id, client_id, topic, partition_stats);
620        }
621    }
622
623    fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Partition) {
624        let labels = [id, client_id, topic, &stats.partition.to_string()];
625
626        self.msgq_cnt
627            .with_guarded_label_values(&labels)
628            .set(stats.msgq_cnt);
629        self.msgq_bytes
630            .with_guarded_label_values(&labels)
631            .set(stats.msgq_bytes);
632        self.xmit_msgq_cnt
633            .with_guarded_label_values(&labels)
634            .set(stats.xmit_msgq_cnt);
635        self.xmit_msgq_bytes
636            .with_guarded_label_values(&labels)
637            .set(stats.xmit_msgq_bytes);
638        self.fetchq_cnt
639            .with_guarded_label_values(&labels)
640            .set(stats.fetchq_cnt);
641        self.fetchq_size
642            .with_guarded_label_values(&labels)
643            .set(stats.fetchq_size);
644        self.query_offset
645            .with_guarded_label_values(&labels)
646            .set(stats.query_offset);
647        self.next_offset
648            .with_guarded_label_values(&labels)
649            .set(stats.next_offset);
650        self.app_offset
651            .with_guarded_label_values(&labels)
652            .set(stats.app_offset);
653        self.stored_offset
654            .with_guarded_label_values(&labels)
655            .set(stats.stored_offset);
656        self.committed_offset
657            .with_guarded_label_values(&labels)
658            .set(stats.committed_offset);
659        self.eof_offset
660            .with_guarded_label_values(&labels)
661            .set(stats.eof_offset);
662        self.lo_offset
663            .with_guarded_label_values(&labels)
664            .set(stats.lo_offset);
665        self.hi_offset
666            .with_guarded_label_values(&labels)
667            .set(stats.hi_offset);
668        self.consumer_lag
669            .with_guarded_label_values(&labels)
670            .set(stats.consumer_lag);
671        self.consumer_lag_store
672            .with_guarded_label_values(&labels)
673            .set(stats.consumer_lag_stored);
674        self.txmsgs
675            .with_guarded_label_values(&labels)
676            .set(stats.txmsgs);
677        self.txbytes
678            .with_guarded_label_values(&labels)
679            .set(stats.txbytes);
680        self.rxmsgs
681            .with_guarded_label_values(&labels)
682            .set(stats.rxmsgs);
683        self.rxbytes
684            .with_guarded_label_values(&labels)
685            .set(stats.rxbytes);
686        self.msgs.with_guarded_label_values(&labels).set(stats.msgs);
687        self.rx_ver_drops
688            .with_guarded_label_values(&labels)
689            .set(stats.rx_ver_drops);
690        self.msgs_inflight
691            .with_guarded_label_values(&labels)
692            .set(stats.msgs_inflight);
693        self.next_ack_seq
694            .with_guarded_label_values(&labels)
695            .set(stats.next_ack_seq);
696        self.next_err_seq
697            .with_guarded_label_values(&labels)
698            .set(stats.next_err_seq);
699        self.acked_msgid
700            .with_guarded_label_values(&labels)
701            .set(stats.acked_msgid);
702    }
703}
704
705impl RdKafkaStats {
706    pub fn new(registry: Registry) -> Self {
707        let ts = register_guarded_int_gauge_vec_with_registry!(
708            "rdkafka_top_ts",
709            "librdkafka's internal monotonic clock (microseconds)",
710            // we cannot tell whether it is for consumer or producer,
711            // it may refer to source_id or sink_id
712            &["id", "client_id"],
713            registry
714        )
715        .unwrap();
716        let time = register_guarded_int_gauge_vec_with_registry!(
717            "rdkafka_top_time",
718            "Wall clock time in seconds since the epoch",
719            &["id", "client_id"],
720            registry
721        )
722        .unwrap();
723        let age = register_guarded_int_gauge_vec_with_registry!(
724            "rdkafka_top_age",
725            "Age of the topic metadata in milliseconds",
726            &["id", "client_id"],
727            registry
728        )
729        .unwrap();
730        let replyq = register_guarded_int_gauge_vec_with_registry!(
731            "rdkafka_top_replyq",
732            "Number of replies waiting to be served",
733            &["id", "client_id"],
734            registry
735        )
736        .unwrap();
737        let msg_cnt = register_guarded_uint_gauge_vec_with_registry!(
738            "rdkafka_top_msg_cnt",
739            "Number of messages in all topics",
740            &["id", "client_id"],
741            registry
742        )
743        .unwrap();
744        let msg_size = register_guarded_uint_gauge_vec_with_registry!(
745            "rdkafka_top_msg_size",
746            "Size of messages in all topics",
747            &["id", "client_id"],
748            registry
749        )
750        .unwrap();
751        let msg_max = register_guarded_uint_gauge_vec_with_registry!(
752            "rdkafka_top_msg_max",
753            "Maximum message size in all topics",
754            &["id", "client_id"],
755            registry
756        )
757        .unwrap();
758        let msg_size_max = register_guarded_uint_gauge_vec_with_registry!(
759            "rdkafka_top_msg_size_max",
760            "Maximum message size in all topics",
761            &["id", "client_id"],
762            registry
763        )
764        .unwrap();
765        let tx = register_guarded_int_gauge_vec_with_registry!(
766            "rdkafka_top_tx",
767            "Number of transmitted messages",
768            &["id", "client_id"],
769            registry
770        )
771        .unwrap();
772        let tx_bytes = register_guarded_int_gauge_vec_with_registry!(
773            "rdkafka_top_tx_bytes",
774            "Number of transmitted bytes",
775            &["id", "client_id"],
776            registry
777        )
778        .unwrap();
779        let rx = register_guarded_int_gauge_vec_with_registry!(
780            "rdkafka_top_rx",
781            "Number of received messages",
782            &["id", "client_id"],
783            registry
784        )
785        .unwrap();
786        let rx_bytes = register_guarded_int_gauge_vec_with_registry!(
787            "rdkafka_top_rx_bytes",
788            "Number of received bytes",
789            &["id", "client_id"],
790            registry
791        )
792        .unwrap();
793        let tx_msgs = register_guarded_int_gauge_vec_with_registry!(
794            "rdkafka_top_tx_msgs",
795            "Number of transmitted messages",
796            &["id", "client_id"],
797            registry
798        )
799        .unwrap();
800        let tx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!(
801            "rdkafka_top_tx_msgs_bytes",
802            "Number of transmitted bytes",
803            &["id", "client_id"],
804            registry
805        )
806        .unwrap();
807        let rx_msgs = register_guarded_int_gauge_vec_with_registry!(
808            "rdkafka_top_rx_msgs",
809            "Number of received messages",
810            &["id", "client_id"],
811            registry
812        )
813        .unwrap();
814        let rx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!(
815            "rdkafka_top_rx_msgs_bytes",
816            "Number of received bytes",
817            &["id", "client_id"],
818            registry
819        )
820        .unwrap();
821        let simple_cnt = register_guarded_int_gauge_vec_with_registry!(
822            "rdkafka_top_simple_cnt",
823            "Number of simple consumer queues",
824            &["id", "client_id"],
825            registry
826        )
827        .unwrap();
828        let metadata_cache_cnt = register_guarded_int_gauge_vec_with_registry!(
829            "rdkafka_top_metadata_cache_cnt",
830            "Number of entries in the metadata cache",
831            &["id", "client_id"],
832            registry
833        )
834        .unwrap();
835
836        let broker_stats = BrokerStats::new(registry.clone());
837        let topic_stats = TopicStats::new(registry.clone());
838        let cgrp = ConsumerGroupStats::new(registry.clone());
839        RdKafkaStats {
840            registry,
841            ts,
842            time,
843            age,
844            replyq,
845            msg_cnt,
846            msg_size,
847            msg_max,
848            msg_size_max,
849            tx,
850            tx_bytes,
851            rx,
852            rx_bytes,
853            tx_msgs,
854            tx_msgs_bytes,
855            rx_msgs,
856            rx_msgs_bytes,
857            simple_cnt,
858            metadata_cache_cnt,
859            broker_stats,
860            topic_stats,
861            cgrp,
862        }
863    }
864
865    pub fn report(&self, id: &str, stats: &Statistics) {
866        let client_id = stats.name.as_str();
867        self.ts
868            .with_guarded_label_values(&[id, client_id])
869            .set(stats.ts);
870        self.time
871            .with_guarded_label_values(&[id, client_id])
872            .set(stats.time);
873        self.age
874            .with_guarded_label_values(&[id, client_id])
875            .set(stats.age);
876        self.replyq
877            .with_guarded_label_values(&[id, client_id])
878            .set(stats.replyq);
879        self.msg_cnt
880            .with_guarded_label_values(&[id, client_id])
881            .set(stats.msg_cnt);
882        self.msg_size
883            .with_guarded_label_values(&[id, client_id])
884            .set(stats.msg_size);
885        self.msg_max
886            .with_guarded_label_values(&[id, client_id])
887            .set(stats.msg_max);
888        self.msg_size_max
889            .with_guarded_label_values(&[id, client_id])
890            .set(stats.msg_size_max);
891        self.tx
892            .with_guarded_label_values(&[id, client_id])
893            .set(stats.tx);
894        self.tx_bytes
895            .with_guarded_label_values(&[id, client_id])
896            .set(stats.tx_bytes);
897        self.rx
898            .with_guarded_label_values(&[id, client_id])
899            .set(stats.rx);
900        self.rx_bytes
901            .with_guarded_label_values(&[id, client_id])
902            .set(stats.rx_bytes);
903        self.tx_msgs
904            .with_guarded_label_values(&[id, client_id])
905            .set(stats.txmsgs);
906        self.tx_msgs_bytes
907            .with_guarded_label_values(&[id, client_id])
908            .set(stats.txmsg_bytes);
909        self.rx_msgs
910            .with_guarded_label_values(&[id, client_id])
911            .set(stats.rxmsgs);
912        self.rx_msgs_bytes
913            .with_guarded_label_values(&[id, client_id])
914            .set(stats.rxmsg_bytes);
915        self.simple_cnt
916            .with_guarded_label_values(&[id, client_id])
917            .set(stats.simple_cnt);
918        self.metadata_cache_cnt
919            .with_guarded_label_values(&[id, client_id])
920            .set(stats.metadata_cache_cnt);
921
922        self.broker_stats.report(id, client_id, stats);
923        self.topic_stats.report(id, client_id, stats);
924        if let Some(cgrp) = &stats.cgrp {
925            self.cgrp.report(id, client_id, cgrp)
926        }
927    }
928}
929
930impl BrokerStats {
931    pub fn new(registry: Registry) -> Self {
932        let state_age = register_guarded_int_gauge_vec_with_registry!(
933            "rdkafka_broker_state_age",
934            "Age of the broker state in seconds",
935            &["id", "client_id", "broker", "state"],
936            registry
937        )
938        .unwrap();
939        let outbuf_cnt = register_guarded_int_gauge_vec_with_registry!(
940            "rdkafka_broker_outbuf_cnt",
941            "Number of messages waiting to be sent to broker",
942            &["id", "client_id", "broker", "state"],
943            registry
944        )
945        .unwrap();
946        let outbuf_msg_cnt = register_guarded_int_gauge_vec_with_registry!(
947            "rdkafka_broker_outbuf_msg_cnt",
948            "Number of messages waiting to be sent to broker",
949            &["id", "client_id", "broker", "state"],
950            registry
951        )
952        .unwrap();
953        let waitresp_cnt = register_guarded_int_gauge_vec_with_registry!(
954            "rdkafka_broker_waitresp_cnt",
955            "Number of requests waiting for response",
956            &["id", "client_id", "broker", "state"],
957            registry
958        )
959        .unwrap();
960        let waitresp_msg_cnt = register_guarded_int_gauge_vec_with_registry!(
961            "rdkafka_broker_waitresp_msg_cnt",
962            "Number of messages waiting for response",
963            &["id", "client_id", "broker", "state"],
964            registry
965        )
966        .unwrap();
967        let tx = register_guarded_uint_gauge_vec_with_registry!(
968            "rdkafka_broker_tx",
969            "Number of transmitted messages",
970            &["id", "client_id", "broker", "state"],
971            registry
972        )
973        .unwrap();
974        let tx_bytes = register_guarded_uint_gauge_vec_with_registry!(
975            "rdkafka_broker_tx_bytes",
976            "Number of transmitted bytes",
977            &["id", "client_id", "broker", "state"],
978            registry
979        )
980        .unwrap();
981        let tx_errs = register_guarded_uint_gauge_vec_with_registry!(
982            "rdkafka_broker_tx_errs",
983            "Number of failed transmitted messages",
984            &["id", "client_id", "broker", "state"],
985            registry
986        )
987        .unwrap();
988        let tx_retries = register_guarded_uint_gauge_vec_with_registry!(
989            "rdkafka_broker_tx_retries",
990            "Number of message retries",
991            &["id", "client_id", "broker", "state"],
992            registry
993        )
994        .unwrap();
995        let tx_idle = register_guarded_int_gauge_vec_with_registry!(
996            "rdkafka_broker_tx_idle",
997            "Number of idle transmit connections",
998            &["id", "client_id", "broker", "state"],
999            registry
1000        )
1001        .unwrap();
1002        let req_timeouts = register_guarded_uint_gauge_vec_with_registry!(
1003            "rdkafka_broker_req_timeouts",
1004            "Number of request timeouts",
1005            &["id", "client_id", "broker", "state"],
1006            registry
1007        )
1008        .unwrap();
1009        let rx = register_guarded_uint_gauge_vec_with_registry!(
1010            "rdkafka_broker_rx",
1011            "Number of received messages",
1012            &["id", "client_id", "broker", "state"],
1013            registry
1014        )
1015        .unwrap();
1016        let rx_bytes = register_guarded_uint_gauge_vec_with_registry!(
1017            "rdkafka_broker_rx_bytes",
1018            "Number of received bytes",
1019            &["id", "client_id", "broker", "state"],
1020            registry
1021        )
1022        .unwrap();
1023        let rx_errs = register_guarded_uint_gauge_vec_with_registry!(
1024            "rdkafka_broker_rx_errs",
1025            "Number of failed received messages",
1026            &["id", "client_id", "broker", "state"],
1027            registry
1028        )
1029        .unwrap();
1030        let rx_corriderrs = register_guarded_uint_gauge_vec_with_registry!(
1031            "rdkafka_broker_rx_corriderrs",
1032            "Number of received messages with invalid correlation id",
1033            &["id", "client_id", "broker", "state"],
1034            registry
1035        )
1036        .unwrap();
1037        let rx_partial = register_guarded_uint_gauge_vec_with_registry!(
1038            "rdkafka_broker_rx_partial",
1039            "Number of partial messages received",
1040            &["id", "client_id", "broker", "state"],
1041            registry
1042        )
1043        .unwrap();
1044        let rx_idle = register_guarded_int_gauge_vec_with_registry!(
1045            "rdkafka_broker_rx_idle",
1046            "Number of idle receive connections",
1047            &["id", "client_id", "broker", "state"],
1048            registry
1049        )
1050        .unwrap();
1051        let req = register_guarded_int_gauge_vec_with_registry!(
1052            "rdkafka_broker_req",
1053            "Number of requests in flight",
1054            &["id", "client_id", "broker", "state", "type"],
1055            registry
1056        )
1057        .unwrap();
1058        let zbuf_grow = register_guarded_uint_gauge_vec_with_registry!(
1059            "rdkafka_broker_zbuf_grow",
1060            "Number of times the broker's output buffer has been reallocated",
1061            &["id", "client_id", "broker", "state"],
1062            registry
1063        )
1064        .unwrap();
1065        let buf_grow = register_guarded_uint_gauge_vec_with_registry!(
1066            "rdkafka_broker_buf_grow",
1067            "Number of times the broker's input buffer has been reallocated",
1068            &["id", "client_id", "broker", "state"],
1069            registry
1070        )
1071        .unwrap();
1072        let wakeups = register_guarded_uint_gauge_vec_with_registry!(
1073            "rdkafka_broker_wakeups",
1074            "Number of wakeups",
1075            &["id", "client_id", "broker", "state"],
1076            registry
1077        )
1078        .unwrap();
1079        let connects = register_guarded_int_gauge_vec_with_registry!(
1080            "rdkafka_broker_connects",
1081            "Number of connection attempts",
1082            &["id", "client_id", "broker", "state"],
1083            registry
1084        )
1085        .unwrap();
1086        let disconnects = register_guarded_int_gauge_vec_with_registry!(
1087            "rdkafka_broker_disconnects",
1088            "Number of disconnects",
1089            &["id", "client_id", "broker", "state"],
1090            registry
1091        )
1092        .unwrap();
1093        let int_latency = StatsWindow::new(registry.clone(), "broker_intlatency");
1094        let outbuf_latency = StatsWindow::new(registry.clone(), "broker_outbuflatency");
1095        let rtt = StatsWindow::new(registry.clone(), "broker_rtt");
1096        let throttle = StatsWindow::new(registry.clone(), "broker_throttle");
1097
1098        BrokerStats {
1099            registry,
1100            state_age,
1101            outbuf_cnt,
1102            outbuf_msg_cnt,
1103            waitresp_cnt,
1104            waitresp_msg_cnt,
1105            tx,
1106            tx_bytes,
1107            tx_errs,
1108            tx_retries,
1109            tx_idle,
1110            req_timeouts,
1111            rx,
1112            rx_bytes,
1113            rx_errs,
1114            rx_corriderrs,
1115            rx_partial,
1116            rx_idle,
1117            req,
1118            zbuf_grow,
1119            buf_grow,
1120            wakeups,
1121            connects,
1122            disconnects,
1123            int_latency,
1124            outbuf_latency,
1125            rtt,
1126            throttle,
1127        }
1128    }
1129
1130    pub fn report(&self, id: &str, client_id: &str, stats: &Statistics) {
1131        for broker_stats in stats.brokers.values() {
1132            self.report_inner(id, client_id, broker_stats);
1133        }
1134    }
1135
1136    fn report_inner(&self, id: &str, client_id: &str, stats: &Broker) {
1137        let broker = stats.nodename.as_str();
1138        let state = stats.state.as_str();
1139        let labels = [id, client_id, broker, state];
1140
1141        self.state_age
1142            .with_guarded_label_values(&labels)
1143            .set(stats.stateage);
1144        self.outbuf_cnt
1145            .with_guarded_label_values(&labels)
1146            .set(stats.outbuf_cnt);
1147        self.outbuf_msg_cnt
1148            .with_guarded_label_values(&labels)
1149            .set(stats.outbuf_msg_cnt);
1150        self.waitresp_cnt
1151            .with_guarded_label_values(&labels)
1152            .set(stats.waitresp_cnt);
1153        self.waitresp_msg_cnt
1154            .with_guarded_label_values(&labels)
1155            .set(stats.waitresp_msg_cnt);
1156        self.tx.with_guarded_label_values(&labels).set(stats.tx);
1157        self.tx_bytes
1158            .with_guarded_label_values(&labels)
1159            .set(stats.txbytes);
1160        self.tx_errs
1161            .with_guarded_label_values(&labels)
1162            .set(stats.txerrs);
1163        self.tx_retries
1164            .with_guarded_label_values(&labels)
1165            .set(stats.txretries);
1166        self.tx_idle
1167            .with_guarded_label_values(&labels)
1168            .set(stats.txidle);
1169        self.req_timeouts
1170            .with_guarded_label_values(&labels)
1171            .set(stats.req_timeouts);
1172        self.rx.with_guarded_label_values(&labels).set(stats.rx);
1173        self.rx_bytes
1174            .with_guarded_label_values(&labels)
1175            .set(stats.rxbytes);
1176        self.rx_errs
1177            .with_guarded_label_values(&labels)
1178            .set(stats.rxerrs);
1179        self.rx_corriderrs
1180            .with_guarded_label_values(&labels)
1181            .set(stats.rxcorriderrs);
1182        self.rx_partial
1183            .with_guarded_label_values(&labels)
1184            .set(stats.rxpartial);
1185        self.rx_idle
1186            .with_guarded_label_values(&labels)
1187            .set(stats.rxidle);
1188        for (req_type, req_cnt) in &stats.req {
1189            self.req
1190                .with_guarded_label_values(&[id, client_id, broker, state, req_type])
1191                .set(*req_cnt);
1192        }
1193        self.zbuf_grow
1194            .with_guarded_label_values(&labels)
1195            .set(stats.zbuf_grow);
1196        self.buf_grow
1197            .with_guarded_label_values(&labels)
1198            .set(stats.buf_grow);
1199        if let Some(wakeups) = stats.wakeups {
1200            self.wakeups.with_guarded_label_values(&labels).set(wakeups);
1201        }
1202        if let Some(connects) = stats.connects {
1203            self.connects
1204                .with_guarded_label_values(&labels)
1205                .set(connects);
1206        }
1207        if let Some(disconnects) = stats.disconnects {
1208            self.disconnects
1209                .with_guarded_label_values(&labels)
1210                .set(disconnects);
1211        }
1212        if let Some(int_latency) = &stats.int_latency {
1213            self.int_latency
1214                .report(id, client_id, broker, "", int_latency);
1215        }
1216        if let Some(outbuf_latency) = &stats.outbuf_latency {
1217            self.outbuf_latency
1218                .report(id, client_id, broker, "", outbuf_latency);
1219        }
1220        if let Some(rtt) = &stats.rtt {
1221            self.rtt.report(id, client_id, broker, "", rtt);
1222        }
1223        if let Some(throttle) = &stats.throttle {
1224            self.throttle.report(id, client_id, broker, "", throttle);
1225        }
1226    }
1227}