1use prometheus::Registry;
16use rdkafka::Statistics;
17use rdkafka::statistics::{Broker, ConsumerGroup, Partition, Topic, Window};
18use risingwave_common::metrics::{LabelGuardedIntGaugeVec, LabelGuardedUintGaugeVec};
19use risingwave_common::{
20 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
21};
22
23#[derive(Debug, Clone)]
24pub struct RdKafkaStats {
25 pub registry: Registry,
26
27 pub ts: LabelGuardedIntGaugeVec<2>,
28 pub time: LabelGuardedIntGaugeVec<2>,
29 pub age: LabelGuardedIntGaugeVec<2>,
30 pub replyq: LabelGuardedIntGaugeVec<2>,
31 pub msg_cnt: LabelGuardedUintGaugeVec<2>,
32 pub msg_size: LabelGuardedUintGaugeVec<2>,
33 pub msg_max: LabelGuardedUintGaugeVec<2>,
34 pub msg_size_max: LabelGuardedUintGaugeVec<2>,
35 pub tx: LabelGuardedIntGaugeVec<2>,
36 pub tx_bytes: LabelGuardedIntGaugeVec<2>,
37 pub rx: LabelGuardedIntGaugeVec<2>,
38 pub rx_bytes: LabelGuardedIntGaugeVec<2>,
39 pub tx_msgs: LabelGuardedIntGaugeVec<2>,
40 pub tx_msgs_bytes: LabelGuardedIntGaugeVec<2>,
41 pub rx_msgs: LabelGuardedIntGaugeVec<2>,
42 pub rx_msgs_bytes: LabelGuardedIntGaugeVec<2>,
43 pub simple_cnt: LabelGuardedIntGaugeVec<2>,
44 pub metadata_cache_cnt: LabelGuardedIntGaugeVec<2>,
45
46 pub broker_stats: BrokerStats,
47 pub topic_stats: TopicStats,
48 pub cgrp: ConsumerGroupStats,
49}
50
51#[derive(Debug, Clone)]
52pub struct BrokerStats {
53 pub registry: Registry,
54
55 pub state_age: LabelGuardedIntGaugeVec<4>,
56 pub outbuf_cnt: LabelGuardedIntGaugeVec<4>,
57 pub outbuf_msg_cnt: LabelGuardedIntGaugeVec<4>,
58 pub waitresp_cnt: LabelGuardedIntGaugeVec<4>,
59 pub waitresp_msg_cnt: LabelGuardedIntGaugeVec<4>,
60 pub tx: LabelGuardedUintGaugeVec<4>,
61 pub tx_bytes: LabelGuardedUintGaugeVec<4>,
62 pub tx_errs: LabelGuardedUintGaugeVec<4>,
63 pub tx_retries: LabelGuardedUintGaugeVec<4>,
64 pub tx_idle: LabelGuardedIntGaugeVec<4>,
65 pub req_timeouts: LabelGuardedUintGaugeVec<4>,
66 pub rx: LabelGuardedUintGaugeVec<4>,
67 pub rx_bytes: LabelGuardedUintGaugeVec<4>,
68 pub rx_errs: LabelGuardedUintGaugeVec<4>,
69 pub rx_corriderrs: LabelGuardedUintGaugeVec<4>,
70 pub rx_partial: LabelGuardedUintGaugeVec<4>,
71 pub rx_idle: LabelGuardedIntGaugeVec<4>,
72 pub req: LabelGuardedIntGaugeVec<5>,
73 pub zbuf_grow: LabelGuardedUintGaugeVec<4>,
74 pub buf_grow: LabelGuardedUintGaugeVec<4>,
75 pub wakeups: LabelGuardedUintGaugeVec<4>,
76 pub connects: LabelGuardedIntGaugeVec<4>,
77 pub disconnects: LabelGuardedIntGaugeVec<4>,
78 pub int_latency: StatsWindow,
79 pub outbuf_latency: StatsWindow,
80 pub rtt: StatsWindow,
81 pub throttle: StatsWindow,
82}
83
84#[derive(Debug, Clone)]
85pub struct TopicStats {
86 pub registry: Registry,
87
88 pub metadata_age: LabelGuardedIntGaugeVec<3>,
89 pub batch_size: StatsWindow,
90 pub batch_cnt: StatsWindow,
91 pub partitions: PartitionStats,
92}
93
94#[derive(Debug, Clone)]
95pub struct StatsWindow {
96 pub registry: Registry,
97
98 pub min: LabelGuardedIntGaugeVec<4>,
99 pub max: LabelGuardedIntGaugeVec<4>,
100 pub avg: LabelGuardedIntGaugeVec<4>,
101 pub sum: LabelGuardedIntGaugeVec<4>,
102 pub cnt: LabelGuardedIntGaugeVec<4>,
103 pub stddev: LabelGuardedIntGaugeVec<4>,
104 pub hdr_size: LabelGuardedIntGaugeVec<4>,
105 pub p50: LabelGuardedIntGaugeVec<4>,
106 pub p75: LabelGuardedIntGaugeVec<4>,
107 pub p90: LabelGuardedIntGaugeVec<4>,
108 pub p95: LabelGuardedIntGaugeVec<4>,
109 pub p99: LabelGuardedIntGaugeVec<4>,
110 pub p99_99: LabelGuardedIntGaugeVec<4>,
111 pub out_of_range: LabelGuardedIntGaugeVec<4>,
112}
113
114#[derive(Debug, Clone)]
115pub struct ConsumerGroupStats {
116 pub registry: Registry,
117
118 pub state_age: LabelGuardedIntGaugeVec<3>,
119 pub rebalance_age: LabelGuardedIntGaugeVec<3>,
121 pub rebalance_cnt: LabelGuardedIntGaugeVec<3>,
122 pub assignment_size: LabelGuardedIntGaugeVec<3>,
124}
125
126impl ConsumerGroupStats {
127 pub fn new(registry: Registry) -> Self {
128 let state_age = register_guarded_int_gauge_vec_with_registry!(
129 "rdkafka_consumer_group_state_age",
130 "Age of the consumer group state in seconds",
131 &["id", "client_id", "state"],
132 registry
133 )
134 .unwrap();
135 let rebalance_age = register_guarded_int_gauge_vec_with_registry!(
136 "rdkafka_consumer_group_rebalance_age",
137 "Age of the last rebalance in seconds",
138 &["id", "client_id", "state"],
139 registry
140 )
141 .unwrap();
142 let rebalance_cnt = register_guarded_int_gauge_vec_with_registry!(
143 "rdkafka_consumer_group_rebalance_cnt",
144 "Number of rebalances",
145 &["id", "client_id", "state"],
146 registry
147 )
148 .unwrap();
149 let assignment_size = register_guarded_int_gauge_vec_with_registry!(
150 "rdkafka_consumer_group_assignment_size",
151 "Number of assigned partitions",
152 &["id", "client_id", "state"],
153 registry
154 )
155 .unwrap();
156
157 Self {
158 registry,
159 state_age,
160 rebalance_age,
161 rebalance_cnt,
162 assignment_size,
163 }
164 }
165
166 pub fn report(&self, id: &str, client_id: &str, stats: &ConsumerGroup) {
167 let state = stats.state.as_str();
168 self.state_age
169 .with_guarded_label_values(&[id, client_id, state])
170 .set(stats.stateage);
171 self.rebalance_age
172 .with_guarded_label_values(&[id, client_id, state])
173 .set(stats.rebalance_age);
174 self.rebalance_cnt
175 .with_guarded_label_values(&[id, client_id, state])
176 .set(stats.rebalance_cnt);
177 self.assignment_size
178 .with_guarded_label_values(&[id, client_id, state])
179 .set(stats.assignment_size as i64);
180 }
181}
182
183impl StatsWindow {
184 pub fn new(registry: Registry, path: &str) -> Self {
185 let get_metric_name = |name: &str| format!("rdkafka_{}_{}", path, name);
186 let min = register_guarded_int_gauge_vec_with_registry!(
187 get_metric_name("min"),
188 "Minimum value",
189 &["id", "client_id", "broker", "topic"],
190 registry
191 )
192 .unwrap();
193 let max = register_guarded_int_gauge_vec_with_registry!(
194 get_metric_name("max"),
195 "Maximum value",
196 &["id", "client_id", "broker", "topic"],
197 registry
198 )
199 .unwrap();
200 let avg = register_guarded_int_gauge_vec_with_registry!(
201 get_metric_name("avg"),
202 "Average value",
203 &["id", "client_id", "broker", "topic"],
204 registry
205 )
206 .unwrap();
207 let sum = register_guarded_int_gauge_vec_with_registry!(
208 get_metric_name("sum"),
209 "Sum of values",
210 &["id", "client_id", "broker", "topic"],
211 registry
212 )
213 .unwrap();
214 let cnt = register_guarded_int_gauge_vec_with_registry!(
215 get_metric_name("cnt"),
216 "Count of values",
217 &["id", "client_id", "broker", "topic"],
218 registry
219 )
220 .unwrap();
221 let stddev = register_guarded_int_gauge_vec_with_registry!(
222 get_metric_name("stddev"),
223 "Standard deviation",
224 &["id", "client_id", "broker", "topic"],
225 registry
226 )
227 .unwrap();
228 let hdr_size = register_guarded_int_gauge_vec_with_registry!(
229 get_metric_name("hdrsize"),
230 "Size of the histogram header",
231 &["id", "client_id", "broker", "topic"],
232 registry
233 )
234 .unwrap();
235 let p50 = register_guarded_int_gauge_vec_with_registry!(
236 get_metric_name("p50"),
237 "50th percentile",
238 &["id", "client_id", "broker", "topic"],
239 registry
240 )
241 .unwrap();
242 let p75 = register_guarded_int_gauge_vec_with_registry!(
243 get_metric_name("p75"),
244 "75th percentile",
245 &["id", "client_id", "broker", "topic"],
246 registry
247 )
248 .unwrap();
249 let p90 = register_guarded_int_gauge_vec_with_registry!(
250 get_metric_name("p90"),
251 "90th percentile",
252 &["id", "client_id", "broker", "topic"],
253 registry
254 )
255 .unwrap();
256 let p95 = register_guarded_int_gauge_vec_with_registry!(
257 get_metric_name("p95"),
258 "95th percentile",
259 &["id", "client_id", "broker", "topic"],
260 registry
261 )
262 .unwrap();
263 let p99 = register_guarded_int_gauge_vec_with_registry!(
264 get_metric_name("p99"),
265 "99th percentile",
266 &["id", "client_id", "broker", "topic"],
267 registry
268 )
269 .unwrap();
270 let p99_99 = register_guarded_int_gauge_vec_with_registry!(
271 get_metric_name("p99_99"),
272 "99.99th percentile",
273 &["id", "client_id", "broker", "topic"],
274 registry
275 )
276 .unwrap();
277 let out_of_range = register_guarded_int_gauge_vec_with_registry!(
278 get_metric_name("out_of_range"),
279 "Out of range values",
280 &["id", "client_id", "broker", "topic"],
281 registry
282 )
283 .unwrap();
284
285 Self {
286 registry,
287 min,
288 max,
289 avg,
290 sum,
291 cnt,
292 stddev,
293 hdr_size,
294 p50,
295 p75,
296 p90,
297 p95,
298 p99,
299 p99_99,
300 out_of_range,
301 }
302 }
303
304 pub fn report(&self, id: &str, client_id: &str, broker: &str, topic: &str, stats: &Window) {
305 let labels = [id, client_id, broker, topic];
306
307 self.min.with_guarded_label_values(&labels).set(stats.min);
308 self.max.with_guarded_label_values(&labels).set(stats.max);
309 self.avg.with_guarded_label_values(&labels).set(stats.avg);
310 self.sum.with_guarded_label_values(&labels).set(stats.sum);
311 self.cnt.with_guarded_label_values(&labels).set(stats.cnt);
312 self.stddev
313 .with_guarded_label_values(&labels)
314 .set(stats.stddev);
315 self.hdr_size
316 .with_guarded_label_values(&labels)
317 .set(stats.hdrsize);
318 self.p50.with_guarded_label_values(&labels).set(stats.p50);
319 self.p75.with_guarded_label_values(&labels).set(stats.p75);
320 self.p90.with_guarded_label_values(&labels).set(stats.p90);
321 self.p99_99
322 .with_guarded_label_values(&labels)
323 .set(stats.p99_99);
324 self.out_of_range
325 .with_guarded_label_values(&labels)
326 .set(stats.outofrange);
327 }
328}
329
330impl TopicStats {
331 pub fn new(registry: Registry) -> Self {
332 let metadata_age = register_guarded_int_gauge_vec_with_registry!(
333 "rdkafka_topic_metadata_age",
334 "Age of the topic metadata in milliseconds",
335 &["id", "client_id", "topic"],
336 registry
337 )
338 .unwrap();
339 let batch_size = StatsWindow::new(registry.clone(), "topic_batchsize");
340 let batch_cnt = StatsWindow::new(registry.clone(), "topic_batchcnt");
341 let partitions = PartitionStats::new(registry.clone());
342 Self {
343 registry,
344 metadata_age,
345 batch_size,
346 batch_cnt,
347 partitions,
348 }
349 }
350
351 pub fn report(&self, id: &str, client_id: &str, stats: &Statistics) {
352 for (topic, topic_stats) in &stats.topics {
353 self.report_inner(id, client_id, topic, topic_stats);
354 }
355 }
356
357 fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Topic) {
358 self.metadata_age
359 .with_guarded_label_values(&[id, client_id, topic])
360 .set(stats.metadata_age);
361 self.batch_size
362 .report(id, client_id, "", topic, &stats.batchsize);
363 self.batch_cnt
364 .report(id, client_id, "", topic, &stats.batchcnt);
365 self.partitions.report(id, client_id, topic, stats)
366 }
367}
368
369#[derive(Debug, Clone)]
370pub struct PartitionStats {
371 pub registry: Registry,
372
373 pub msgq_cnt: LabelGuardedIntGaugeVec<4>,
374 pub msgq_bytes: LabelGuardedUintGaugeVec<4>,
375 pub xmit_msgq_cnt: LabelGuardedIntGaugeVec<4>,
376 pub xmit_msgq_bytes: LabelGuardedUintGaugeVec<4>,
377 pub fetchq_cnt: LabelGuardedIntGaugeVec<4>,
378 pub fetchq_size: LabelGuardedUintGaugeVec<4>,
379 pub query_offset: LabelGuardedIntGaugeVec<4>,
380 pub next_offset: LabelGuardedIntGaugeVec<4>,
381 pub app_offset: LabelGuardedIntGaugeVec<4>,
382 pub stored_offset: LabelGuardedIntGaugeVec<4>,
383 pub committed_offset: LabelGuardedIntGaugeVec<4>,
384 pub eof_offset: LabelGuardedIntGaugeVec<4>,
385 pub lo_offset: LabelGuardedIntGaugeVec<4>,
386 pub hi_offset: LabelGuardedIntGaugeVec<4>,
387 pub consumer_lag: LabelGuardedIntGaugeVec<4>,
388 pub consumer_lag_store: LabelGuardedIntGaugeVec<4>,
389 pub txmsgs: LabelGuardedUintGaugeVec<4>,
390 pub txbytes: LabelGuardedUintGaugeVec<4>,
391 pub rxmsgs: LabelGuardedUintGaugeVec<4>,
392 pub rxbytes: LabelGuardedUintGaugeVec<4>,
393 pub msgs: LabelGuardedUintGaugeVec<4>,
394 pub rx_ver_drops: LabelGuardedUintGaugeVec<4>,
395 pub msgs_inflight: LabelGuardedIntGaugeVec<4>,
396 pub next_ack_seq: LabelGuardedIntGaugeVec<4>,
397 pub next_err_seq: LabelGuardedIntGaugeVec<4>,
398 pub acked_msgid: LabelGuardedUintGaugeVec<4>,
399}
400
401impl PartitionStats {
402 pub fn new(registry: Registry) -> Self {
403 let msgq_cnt = register_guarded_int_gauge_vec_with_registry!(
404 "rdkafka_topic_partition_msgq_cnt",
405 "Number of messages in the producer queue",
406 &["id", "client_id", "topic", "partition"],
407 registry
408 )
409 .unwrap();
410 let msgq_bytes = register_guarded_uint_gauge_vec_with_registry!(
411 "rdkafka_topic_partition_msgq_bytes",
412 "Size of messages in the producer queue",
413 &["id", "client_id", "topic", "partition"],
414 registry
415 )
416 .unwrap();
417 let xmit_msgq_cnt = register_guarded_int_gauge_vec_with_registry!(
418 "rdkafka_topic_partition_xmit_msgq_cnt",
419 "Number of messages in the transmit queue",
420 &["id", "client_id", "topic", "partition"],
421 registry
422 )
423 .unwrap();
424 let xmit_msgq_bytes = register_guarded_uint_gauge_vec_with_registry!(
425 "rdkafka_topic_partition_xmit_msgq_bytes",
426 "Size of messages in the transmit queue",
427 &["id", "client_id", "topic", "partition"],
428 registry
429 )
430 .unwrap();
431 let fetchq_cnt = register_guarded_int_gauge_vec_with_registry!(
432 "rdkafka_topic_partition_fetchq_cnt",
433 "Number of messages in the fetch queue",
434 &["id", "client_id", "topic", "partition"],
435 registry
436 )
437 .unwrap();
438 let fetchq_size = register_guarded_uint_gauge_vec_with_registry!(
439 "rdkafka_topic_partition_fetchq_size",
440 "Size of messages in the fetch queue",
441 &["id", "client_id", "topic", "partition"],
442 registry
443 )
444 .unwrap();
445 let query_offset = register_guarded_int_gauge_vec_with_registry!(
446 "rdkafka_topic_partition_query_offset",
447 "Current query offset",
448 &["id", "client_id", "topic", "partition"],
449 registry
450 )
451 .unwrap();
452 let next_offset = register_guarded_int_gauge_vec_with_registry!(
453 "rdkafka_topic_partition_next_offset",
454 "Next offset to query",
455 &["id", "client_id", "topic", "partition"],
456 registry
457 )
458 .unwrap();
459 let app_offset = register_guarded_int_gauge_vec_with_registry!(
460 "rdkafka_topic_partition_app_offset",
461 "Last acknowledged offset",
462 &["id", "client_id", "topic", "partition"],
463 registry
464 )
465 .unwrap();
466 let stored_offset = register_guarded_int_gauge_vec_with_registry!(
467 "rdkafka_topic_partition_stored_offset",
468 "Last stored offset",
469 &["id", "client_id", "topic", "partition"],
470 registry
471 )
472 .unwrap();
473 let committed_offset = register_guarded_int_gauge_vec_with_registry!(
474 "rdkafka_topic_partition_committed_offset",
475 "Last committed offset",
476 &["id", "client_id", "topic", "partition"],
477 registry
478 )
479 .unwrap();
480 let eof_offset = register_guarded_int_gauge_vec_with_registry!(
481 "rdkafka_topic_partition_eof_offset",
482 "Last offset in broker log",
483 &["id", "client_id", "topic", "partition"],
484 registry
485 )
486 .unwrap();
487 let lo_offset = register_guarded_int_gauge_vec_with_registry!(
488 "rdkafka_topic_partition_lo_offset",
489 "Low offset",
490 &["id", "client_id", "topic", "partition"],
491 registry
492 )
493 .unwrap();
494 let hi_offset = register_guarded_int_gauge_vec_with_registry!(
495 "rdkafka_topic_partition_hi_offset",
496 "High offset",
497 &["id", "client_id", "topic", "partition"],
498 registry
499 )
500 .unwrap();
501 let consumer_lag = register_guarded_int_gauge_vec_with_registry!(
502 "rdkafka_topic_partition_consumer_lag",
503 "Consumer lag",
504 &["id", "client_id", "topic", "partition"],
505 registry
506 )
507 .unwrap();
508 let consumer_lag_store = register_guarded_int_gauge_vec_with_registry!(
509 "rdkafka_topic_partition_consumer_lag_store",
510 "Consumer lag stored",
511 &["id", "client_id", "topic", "partition"],
512 registry
513 )
514 .unwrap();
515 let txmsgs = register_guarded_uint_gauge_vec_with_registry!(
516 "rdkafka_topic_partition_txmsgs",
517 "Number of transmitted messages",
518 &["id", "client_id", "topic", "partition"],
519 registry
520 )
521 .unwrap();
522 let txbytes = register_guarded_uint_gauge_vec_with_registry!(
523 "rdkafka_topic_partition_txbytes",
524 "Number of transmitted bytes",
525 &["id", "client_id", "topic", "partition"],
526 registry
527 )
528 .unwrap();
529 let rxmsgs = register_guarded_uint_gauge_vec_with_registry!(
530 "rdkafka_topic_partition_rxmsgs",
531 "Number of received messages",
532 &["id", "client_id", "topic", "partition"],
533 registry
534 )
535 .unwrap();
536 let rxbytes = register_guarded_uint_gauge_vec_with_registry!(
537 "rdkafka_topic_partition_rxbytes",
538 "Number of received bytes",
539 &["id", "client_id", "topic", "partition"],
540 registry
541 )
542 .unwrap();
543 let msgs = register_guarded_uint_gauge_vec_with_registry!(
544 "rdkafka_topic_partition_msgs",
545 "Number of messages in partition",
546 &["id", "client_id", "topic", "partition"],
547 registry
548 )
549 .unwrap();
550 let rx_ver_drops = register_guarded_uint_gauge_vec_with_registry!(
551 "rdkafka_topic_partition_rx_ver_drops",
552 "Number of received messages dropped due to version mismatch",
553 &["id", "client_id", "topic", "partition"],
554 registry
555 )
556 .unwrap();
557 let msgs_inflight = register_guarded_int_gauge_vec_with_registry!(
558 "rdkafka_topic_partition_msgs_inflight",
559 "Number of messages in-flight",
560 &["id", "client_id", "topic", "partition"],
561 registry
562 )
563 .unwrap();
564 let next_ack_seq = register_guarded_int_gauge_vec_with_registry!(
565 "rdkafka_topic_partition_next_ack_seq",
566 "Next ack sequence number",
567 &["id", "client_id", "topic", "partition"],
568 registry
569 )
570 .unwrap();
571 let next_err_seq = register_guarded_int_gauge_vec_with_registry!(
572 "rdkafka_topic_partition_next_err_seq",
573 "Next error sequence number",
574 &["id", "client_id", "topic", "partition"],
575 registry
576 )
577 .unwrap();
578 let acked_msgid = register_guarded_uint_gauge_vec_with_registry!(
579 "rdkafka_topic_partition_acked_msgid",
580 "Acknowledged message ID",
581 &["id", "client_id", "topic", "partition"],
582 registry
583 )
584 .unwrap();
585
586 Self {
587 registry,
588 msgq_cnt,
589 msgq_bytes,
590 xmit_msgq_cnt,
591 xmit_msgq_bytes,
592 fetchq_cnt,
593 fetchq_size,
594 query_offset,
595 next_offset,
596 app_offset,
597 stored_offset,
598 committed_offset,
599 eof_offset,
600 lo_offset,
601 hi_offset,
602 consumer_lag,
603 consumer_lag_store,
604 txmsgs,
605 txbytes,
606 rxmsgs,
607 rxbytes,
608 msgs,
609 rx_ver_drops,
610 msgs_inflight,
611 next_ack_seq,
612 next_err_seq,
613 acked_msgid,
614 }
615 }
616
617 pub fn report(&self, id: &str, client_id: &str, topic: &str, stats: &Topic) {
618 for partition_stats in stats.partitions.values() {
619 self.report_inner(id, client_id, topic, partition_stats);
620 }
621 }
622
623 fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Partition) {
624 let labels = [id, client_id, topic, &stats.partition.to_string()];
625
626 self.msgq_cnt
627 .with_guarded_label_values(&labels)
628 .set(stats.msgq_cnt);
629 self.msgq_bytes
630 .with_guarded_label_values(&labels)
631 .set(stats.msgq_bytes);
632 self.xmit_msgq_cnt
633 .with_guarded_label_values(&labels)
634 .set(stats.xmit_msgq_cnt);
635 self.xmit_msgq_bytes
636 .with_guarded_label_values(&labels)
637 .set(stats.xmit_msgq_bytes);
638 self.fetchq_cnt
639 .with_guarded_label_values(&labels)
640 .set(stats.fetchq_cnt);
641 self.fetchq_size
642 .with_guarded_label_values(&labels)
643 .set(stats.fetchq_size);
644 self.query_offset
645 .with_guarded_label_values(&labels)
646 .set(stats.query_offset);
647 self.next_offset
648 .with_guarded_label_values(&labels)
649 .set(stats.next_offset);
650 self.app_offset
651 .with_guarded_label_values(&labels)
652 .set(stats.app_offset);
653 self.stored_offset
654 .with_guarded_label_values(&labels)
655 .set(stats.stored_offset);
656 self.committed_offset
657 .with_guarded_label_values(&labels)
658 .set(stats.committed_offset);
659 self.eof_offset
660 .with_guarded_label_values(&labels)
661 .set(stats.eof_offset);
662 self.lo_offset
663 .with_guarded_label_values(&labels)
664 .set(stats.lo_offset);
665 self.hi_offset
666 .with_guarded_label_values(&labels)
667 .set(stats.hi_offset);
668 self.consumer_lag
669 .with_guarded_label_values(&labels)
670 .set(stats.consumer_lag);
671 self.consumer_lag_store
672 .with_guarded_label_values(&labels)
673 .set(stats.consumer_lag_stored);
674 self.txmsgs
675 .with_guarded_label_values(&labels)
676 .set(stats.txmsgs);
677 self.txbytes
678 .with_guarded_label_values(&labels)
679 .set(stats.txbytes);
680 self.rxmsgs
681 .with_guarded_label_values(&labels)
682 .set(stats.rxmsgs);
683 self.rxbytes
684 .with_guarded_label_values(&labels)
685 .set(stats.rxbytes);
686 self.msgs.with_guarded_label_values(&labels).set(stats.msgs);
687 self.rx_ver_drops
688 .with_guarded_label_values(&labels)
689 .set(stats.rx_ver_drops);
690 self.msgs_inflight
691 .with_guarded_label_values(&labels)
692 .set(stats.msgs_inflight);
693 self.next_ack_seq
694 .with_guarded_label_values(&labels)
695 .set(stats.next_ack_seq);
696 self.next_err_seq
697 .with_guarded_label_values(&labels)
698 .set(stats.next_err_seq);
699 self.acked_msgid
700 .with_guarded_label_values(&labels)
701 .set(stats.acked_msgid);
702 }
703}
704
705impl RdKafkaStats {
706 pub fn new(registry: Registry) -> Self {
707 let ts = register_guarded_int_gauge_vec_with_registry!(
708 "rdkafka_top_ts",
709 "librdkafka's internal monotonic clock (microseconds)",
710 &["id", "client_id"],
713 registry
714 )
715 .unwrap();
716 let time = register_guarded_int_gauge_vec_with_registry!(
717 "rdkafka_top_time",
718 "Wall clock time in seconds since the epoch",
719 &["id", "client_id"],
720 registry
721 )
722 .unwrap();
723 let age = register_guarded_int_gauge_vec_with_registry!(
724 "rdkafka_top_age",
725 "Age of the topic metadata in milliseconds",
726 &["id", "client_id"],
727 registry
728 )
729 .unwrap();
730 let replyq = register_guarded_int_gauge_vec_with_registry!(
731 "rdkafka_top_replyq",
732 "Number of replies waiting to be served",
733 &["id", "client_id"],
734 registry
735 )
736 .unwrap();
737 let msg_cnt = register_guarded_uint_gauge_vec_with_registry!(
738 "rdkafka_top_msg_cnt",
739 "Number of messages in all topics",
740 &["id", "client_id"],
741 registry
742 )
743 .unwrap();
744 let msg_size = register_guarded_uint_gauge_vec_with_registry!(
745 "rdkafka_top_msg_size",
746 "Size of messages in all topics",
747 &["id", "client_id"],
748 registry
749 )
750 .unwrap();
751 let msg_max = register_guarded_uint_gauge_vec_with_registry!(
752 "rdkafka_top_msg_max",
753 "Maximum message size in all topics",
754 &["id", "client_id"],
755 registry
756 )
757 .unwrap();
758 let msg_size_max = register_guarded_uint_gauge_vec_with_registry!(
759 "rdkafka_top_msg_size_max",
760 "Maximum message size in all topics",
761 &["id", "client_id"],
762 registry
763 )
764 .unwrap();
765 let tx = register_guarded_int_gauge_vec_with_registry!(
766 "rdkafka_top_tx",
767 "Number of transmitted messages",
768 &["id", "client_id"],
769 registry
770 )
771 .unwrap();
772 let tx_bytes = register_guarded_int_gauge_vec_with_registry!(
773 "rdkafka_top_tx_bytes",
774 "Number of transmitted bytes",
775 &["id", "client_id"],
776 registry
777 )
778 .unwrap();
779 let rx = register_guarded_int_gauge_vec_with_registry!(
780 "rdkafka_top_rx",
781 "Number of received messages",
782 &["id", "client_id"],
783 registry
784 )
785 .unwrap();
786 let rx_bytes = register_guarded_int_gauge_vec_with_registry!(
787 "rdkafka_top_rx_bytes",
788 "Number of received bytes",
789 &["id", "client_id"],
790 registry
791 )
792 .unwrap();
793 let tx_msgs = register_guarded_int_gauge_vec_with_registry!(
794 "rdkafka_top_tx_msgs",
795 "Number of transmitted messages",
796 &["id", "client_id"],
797 registry
798 )
799 .unwrap();
800 let tx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!(
801 "rdkafka_top_tx_msgs_bytes",
802 "Number of transmitted bytes",
803 &["id", "client_id"],
804 registry
805 )
806 .unwrap();
807 let rx_msgs = register_guarded_int_gauge_vec_with_registry!(
808 "rdkafka_top_rx_msgs",
809 "Number of received messages",
810 &["id", "client_id"],
811 registry
812 )
813 .unwrap();
814 let rx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!(
815 "rdkafka_top_rx_msgs_bytes",
816 "Number of received bytes",
817 &["id", "client_id"],
818 registry
819 )
820 .unwrap();
821 let simple_cnt = register_guarded_int_gauge_vec_with_registry!(
822 "rdkafka_top_simple_cnt",
823 "Number of simple consumer queues",
824 &["id", "client_id"],
825 registry
826 )
827 .unwrap();
828 let metadata_cache_cnt = register_guarded_int_gauge_vec_with_registry!(
829 "rdkafka_top_metadata_cache_cnt",
830 "Number of entries in the metadata cache",
831 &["id", "client_id"],
832 registry
833 )
834 .unwrap();
835
836 let broker_stats = BrokerStats::new(registry.clone());
837 let topic_stats = TopicStats::new(registry.clone());
838 let cgrp = ConsumerGroupStats::new(registry.clone());
839 RdKafkaStats {
840 registry,
841 ts,
842 time,
843 age,
844 replyq,
845 msg_cnt,
846 msg_size,
847 msg_max,
848 msg_size_max,
849 tx,
850 tx_bytes,
851 rx,
852 rx_bytes,
853 tx_msgs,
854 tx_msgs_bytes,
855 rx_msgs,
856 rx_msgs_bytes,
857 simple_cnt,
858 metadata_cache_cnt,
859 broker_stats,
860 topic_stats,
861 cgrp,
862 }
863 }
864
865 pub fn report(&self, id: &str, stats: &Statistics) {
866 let client_id = stats.name.as_str();
867 self.ts
868 .with_guarded_label_values(&[id, client_id])
869 .set(stats.ts);
870 self.time
871 .with_guarded_label_values(&[id, client_id])
872 .set(stats.time);
873 self.age
874 .with_guarded_label_values(&[id, client_id])
875 .set(stats.age);
876 self.replyq
877 .with_guarded_label_values(&[id, client_id])
878 .set(stats.replyq);
879 self.msg_cnt
880 .with_guarded_label_values(&[id, client_id])
881 .set(stats.msg_cnt);
882 self.msg_size
883 .with_guarded_label_values(&[id, client_id])
884 .set(stats.msg_size);
885 self.msg_max
886 .with_guarded_label_values(&[id, client_id])
887 .set(stats.msg_max);
888 self.msg_size_max
889 .with_guarded_label_values(&[id, client_id])
890 .set(stats.msg_size_max);
891 self.tx
892 .with_guarded_label_values(&[id, client_id])
893 .set(stats.tx);
894 self.tx_bytes
895 .with_guarded_label_values(&[id, client_id])
896 .set(stats.tx_bytes);
897 self.rx
898 .with_guarded_label_values(&[id, client_id])
899 .set(stats.rx);
900 self.rx_bytes
901 .with_guarded_label_values(&[id, client_id])
902 .set(stats.rx_bytes);
903 self.tx_msgs
904 .with_guarded_label_values(&[id, client_id])
905 .set(stats.txmsgs);
906 self.tx_msgs_bytes
907 .with_guarded_label_values(&[id, client_id])
908 .set(stats.txmsg_bytes);
909 self.rx_msgs
910 .with_guarded_label_values(&[id, client_id])
911 .set(stats.rxmsgs);
912 self.rx_msgs_bytes
913 .with_guarded_label_values(&[id, client_id])
914 .set(stats.rxmsg_bytes);
915 self.simple_cnt
916 .with_guarded_label_values(&[id, client_id])
917 .set(stats.simple_cnt);
918 self.metadata_cache_cnt
919 .with_guarded_label_values(&[id, client_id])
920 .set(stats.metadata_cache_cnt);
921
922 self.broker_stats.report(id, client_id, stats);
923 self.topic_stats.report(id, client_id, stats);
924 if let Some(cgrp) = &stats.cgrp {
925 self.cgrp.report(id, client_id, cgrp)
926 }
927 }
928}
929
930impl BrokerStats {
931 pub fn new(registry: Registry) -> Self {
932 let state_age = register_guarded_int_gauge_vec_with_registry!(
933 "rdkafka_broker_state_age",
934 "Age of the broker state in seconds",
935 &["id", "client_id", "broker", "state"],
936 registry
937 )
938 .unwrap();
939 let outbuf_cnt = register_guarded_int_gauge_vec_with_registry!(
940 "rdkafka_broker_outbuf_cnt",
941 "Number of messages waiting to be sent to broker",
942 &["id", "client_id", "broker", "state"],
943 registry
944 )
945 .unwrap();
946 let outbuf_msg_cnt = register_guarded_int_gauge_vec_with_registry!(
947 "rdkafka_broker_outbuf_msg_cnt",
948 "Number of messages waiting to be sent to broker",
949 &["id", "client_id", "broker", "state"],
950 registry
951 )
952 .unwrap();
953 let waitresp_cnt = register_guarded_int_gauge_vec_with_registry!(
954 "rdkafka_broker_waitresp_cnt",
955 "Number of requests waiting for response",
956 &["id", "client_id", "broker", "state"],
957 registry
958 )
959 .unwrap();
960 let waitresp_msg_cnt = register_guarded_int_gauge_vec_with_registry!(
961 "rdkafka_broker_waitresp_msg_cnt",
962 "Number of messages waiting for response",
963 &["id", "client_id", "broker", "state"],
964 registry
965 )
966 .unwrap();
967 let tx = register_guarded_uint_gauge_vec_with_registry!(
968 "rdkafka_broker_tx",
969 "Number of transmitted messages",
970 &["id", "client_id", "broker", "state"],
971 registry
972 )
973 .unwrap();
974 let tx_bytes = register_guarded_uint_gauge_vec_with_registry!(
975 "rdkafka_broker_tx_bytes",
976 "Number of transmitted bytes",
977 &["id", "client_id", "broker", "state"],
978 registry
979 )
980 .unwrap();
981 let tx_errs = register_guarded_uint_gauge_vec_with_registry!(
982 "rdkafka_broker_tx_errs",
983 "Number of failed transmitted messages",
984 &["id", "client_id", "broker", "state"],
985 registry
986 )
987 .unwrap();
988 let tx_retries = register_guarded_uint_gauge_vec_with_registry!(
989 "rdkafka_broker_tx_retries",
990 "Number of message retries",
991 &["id", "client_id", "broker", "state"],
992 registry
993 )
994 .unwrap();
995 let tx_idle = register_guarded_int_gauge_vec_with_registry!(
996 "rdkafka_broker_tx_idle",
997 "Number of idle transmit connections",
998 &["id", "client_id", "broker", "state"],
999 registry
1000 )
1001 .unwrap();
1002 let req_timeouts = register_guarded_uint_gauge_vec_with_registry!(
1003 "rdkafka_broker_req_timeouts",
1004 "Number of request timeouts",
1005 &["id", "client_id", "broker", "state"],
1006 registry
1007 )
1008 .unwrap();
1009 let rx = register_guarded_uint_gauge_vec_with_registry!(
1010 "rdkafka_broker_rx",
1011 "Number of received messages",
1012 &["id", "client_id", "broker", "state"],
1013 registry
1014 )
1015 .unwrap();
1016 let rx_bytes = register_guarded_uint_gauge_vec_with_registry!(
1017 "rdkafka_broker_rx_bytes",
1018 "Number of received bytes",
1019 &["id", "client_id", "broker", "state"],
1020 registry
1021 )
1022 .unwrap();
1023 let rx_errs = register_guarded_uint_gauge_vec_with_registry!(
1024 "rdkafka_broker_rx_errs",
1025 "Number of failed received messages",
1026 &["id", "client_id", "broker", "state"],
1027 registry
1028 )
1029 .unwrap();
1030 let rx_corriderrs = register_guarded_uint_gauge_vec_with_registry!(
1031 "rdkafka_broker_rx_corriderrs",
1032 "Number of received messages with invalid correlation id",
1033 &["id", "client_id", "broker", "state"],
1034 registry
1035 )
1036 .unwrap();
1037 let rx_partial = register_guarded_uint_gauge_vec_with_registry!(
1038 "rdkafka_broker_rx_partial",
1039 "Number of partial messages received",
1040 &["id", "client_id", "broker", "state"],
1041 registry
1042 )
1043 .unwrap();
1044 let rx_idle = register_guarded_int_gauge_vec_with_registry!(
1045 "rdkafka_broker_rx_idle",
1046 "Number of idle receive connections",
1047 &["id", "client_id", "broker", "state"],
1048 registry
1049 )
1050 .unwrap();
1051 let req = register_guarded_int_gauge_vec_with_registry!(
1052 "rdkafka_broker_req",
1053 "Number of requests in flight",
1054 &["id", "client_id", "broker", "state", "type"],
1055 registry
1056 )
1057 .unwrap();
1058 let zbuf_grow = register_guarded_uint_gauge_vec_with_registry!(
1059 "rdkafka_broker_zbuf_grow",
1060 "Number of times the broker's output buffer has been reallocated",
1061 &["id", "client_id", "broker", "state"],
1062 registry
1063 )
1064 .unwrap();
1065 let buf_grow = register_guarded_uint_gauge_vec_with_registry!(
1066 "rdkafka_broker_buf_grow",
1067 "Number of times the broker's input buffer has been reallocated",
1068 &["id", "client_id", "broker", "state"],
1069 registry
1070 )
1071 .unwrap();
1072 let wakeups = register_guarded_uint_gauge_vec_with_registry!(
1073 "rdkafka_broker_wakeups",
1074 "Number of wakeups",
1075 &["id", "client_id", "broker", "state"],
1076 registry
1077 )
1078 .unwrap();
1079 let connects = register_guarded_int_gauge_vec_with_registry!(
1080 "rdkafka_broker_connects",
1081 "Number of connection attempts",
1082 &["id", "client_id", "broker", "state"],
1083 registry
1084 )
1085 .unwrap();
1086 let disconnects = register_guarded_int_gauge_vec_with_registry!(
1087 "rdkafka_broker_disconnects",
1088 "Number of disconnects",
1089 &["id", "client_id", "broker", "state"],
1090 registry
1091 )
1092 .unwrap();
1093 let int_latency = StatsWindow::new(registry.clone(), "broker_intlatency");
1094 let outbuf_latency = StatsWindow::new(registry.clone(), "broker_outbuflatency");
1095 let rtt = StatsWindow::new(registry.clone(), "broker_rtt");
1096 let throttle = StatsWindow::new(registry.clone(), "broker_throttle");
1097
1098 BrokerStats {
1099 registry,
1100 state_age,
1101 outbuf_cnt,
1102 outbuf_msg_cnt,
1103 waitresp_cnt,
1104 waitresp_msg_cnt,
1105 tx,
1106 tx_bytes,
1107 tx_errs,
1108 tx_retries,
1109 tx_idle,
1110 req_timeouts,
1111 rx,
1112 rx_bytes,
1113 rx_errs,
1114 rx_corriderrs,
1115 rx_partial,
1116 rx_idle,
1117 req,
1118 zbuf_grow,
1119 buf_grow,
1120 wakeups,
1121 connects,
1122 disconnects,
1123 int_latency,
1124 outbuf_latency,
1125 rtt,
1126 throttle,
1127 }
1128 }
1129
1130 pub fn report(&self, id: &str, client_id: &str, stats: &Statistics) {
1131 for broker_stats in stats.brokers.values() {
1132 self.report_inner(id, client_id, broker_stats);
1133 }
1134 }
1135
1136 fn report_inner(&self, id: &str, client_id: &str, stats: &Broker) {
1137 let broker = stats.nodename.as_str();
1138 let state = stats.state.as_str();
1139 let labels = [id, client_id, broker, state];
1140
1141 self.state_age
1142 .with_guarded_label_values(&labels)
1143 .set(stats.stateage);
1144 self.outbuf_cnt
1145 .with_guarded_label_values(&labels)
1146 .set(stats.outbuf_cnt);
1147 self.outbuf_msg_cnt
1148 .with_guarded_label_values(&labels)
1149 .set(stats.outbuf_msg_cnt);
1150 self.waitresp_cnt
1151 .with_guarded_label_values(&labels)
1152 .set(stats.waitresp_cnt);
1153 self.waitresp_msg_cnt
1154 .with_guarded_label_values(&labels)
1155 .set(stats.waitresp_msg_cnt);
1156 self.tx.with_guarded_label_values(&labels).set(stats.tx);
1157 self.tx_bytes
1158 .with_guarded_label_values(&labels)
1159 .set(stats.txbytes);
1160 self.tx_errs
1161 .with_guarded_label_values(&labels)
1162 .set(stats.txerrs);
1163 self.tx_retries
1164 .with_guarded_label_values(&labels)
1165 .set(stats.txretries);
1166 self.tx_idle
1167 .with_guarded_label_values(&labels)
1168 .set(stats.txidle);
1169 self.req_timeouts
1170 .with_guarded_label_values(&labels)
1171 .set(stats.req_timeouts);
1172 self.rx.with_guarded_label_values(&labels).set(stats.rx);
1173 self.rx_bytes
1174 .with_guarded_label_values(&labels)
1175 .set(stats.rxbytes);
1176 self.rx_errs
1177 .with_guarded_label_values(&labels)
1178 .set(stats.rxerrs);
1179 self.rx_corriderrs
1180 .with_guarded_label_values(&labels)
1181 .set(stats.rxcorriderrs);
1182 self.rx_partial
1183 .with_guarded_label_values(&labels)
1184 .set(stats.rxpartial);
1185 self.rx_idle
1186 .with_guarded_label_values(&labels)
1187 .set(stats.rxidle);
1188 for (req_type, req_cnt) in &stats.req {
1189 self.req
1190 .with_guarded_label_values(&[id, client_id, broker, state, req_type])
1191 .set(*req_cnt);
1192 }
1193 self.zbuf_grow
1194 .with_guarded_label_values(&labels)
1195 .set(stats.zbuf_grow);
1196 self.buf_grow
1197 .with_guarded_label_values(&labels)
1198 .set(stats.buf_grow);
1199 if let Some(wakeups) = stats.wakeups {
1200 self.wakeups.with_guarded_label_values(&labels).set(wakeups);
1201 }
1202 if let Some(connects) = stats.connects {
1203 self.connects
1204 .with_guarded_label_values(&labels)
1205 .set(connects);
1206 }
1207 if let Some(disconnects) = stats.disconnects {
1208 self.disconnects
1209 .with_guarded_label_values(&labels)
1210 .set(disconnects);
1211 }
1212 if let Some(int_latency) = &stats.int_latency {
1213 self.int_latency
1214 .report(id, client_id, broker, "", int_latency);
1215 }
1216 if let Some(outbuf_latency) = &stats.outbuf_latency {
1217 self.outbuf_latency
1218 .report(id, client_id, broker, "", outbuf_latency);
1219 }
1220 if let Some(rtt) = &stats.rtt {
1221 self.rtt.report(id, client_id, broker, "", rtt);
1222 }
1223 if let Some(throttle) = &stats.throttle {
1224 self.throttle.report(id, client_id, broker, "", throttle);
1225 }
1226 }
1227}