risingwave_connector/source/iceberg/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27#[derive(Clone)]
28pub struct IcebergScanMetrics {
29    // -- Existing --
30    pub iceberg_read_bytes: LabelGuardedIntCounterVec,
31
32    // -- Snapshot & Discovery (List Executor) --
33    /// Time difference (seconds) between the latest available snapshot and
34    /// the last ingested snapshot.
35    pub iceberg_source_snapshot_lag_seconds: LabelGuardedIntGaugeVec,
36
37    /// Total number of snapshots discovered via incremental scan.
38    pub iceberg_source_snapshots_discovered_total: LabelGuardedIntCounterVec,
39
40    /// Time spent planning files from a snapshot (metadata operation).
41    pub iceberg_source_list_duration_seconds: LabelGuardedHistogramVec,
42
43    /// Files discovered per scan, labeled by `file_type` (data, `eq_delete`, `pos_delete`).
44    pub iceberg_source_files_discovered_total: LabelGuardedIntCounterVec,
45
46    // -- Data Reading (used in scan_task_to_chunk_with_deletes, labeled by table_name) --
47    /// Per-file read duration.
48    pub iceberg_source_file_read_duration_seconds: LabelGuardedHistogramVec,
49
50    /// Total rows read from Iceberg source.
51    pub iceberg_source_rows_read_total: LabelGuardedIntCounterVec,
52
53    /// Total files read from Iceberg source, labeled by `file_type`.
54    pub iceberg_source_files_read_total: LabelGuardedIntCounterVec,
55
56    // -- Delete Handling --
57    /// Rows removed by delete processing, labeled by `delete_type`.
58    pub iceberg_source_delete_rows_applied_total: LabelGuardedIntCounterVec,
59
60    /// Histogram of delete files attached per data file scan task.
61    pub iceberg_source_delete_files_per_data_file: LabelGuardedHistogramVec,
62
63    // -- Operational Health --
64    /// Number of files currently being fetched by the active reader.
65    pub iceberg_source_inflight_file_count: LabelGuardedIntGaugeVec,
66
67    /// Categorized scan error counter.
68    pub iceberg_source_scan_errors_total: LabelGuardedIntCounterVec,
69}
70
71impl IcebergScanMetrics {
72    fn new(registry: &Registry) -> Self {
73        let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
74            "iceberg_read_bytes",
75            "Total size of iceberg read requests",
76            &["table_name"],
77            registry
78        )
79        .unwrap();
80
81        let iceberg_source_snapshot_lag_seconds = register_guarded_int_gauge_vec_with_registry!(
82            "iceberg_source_snapshot_lag_seconds",
83            "Lag between latest available snapshot and last ingested snapshot in seconds",
84            &["source_id", "source_name", "table_name"],
85            registry
86        )
87        .unwrap();
88
89        let iceberg_source_snapshots_discovered_total =
90            register_guarded_int_counter_vec_with_registry!(
91                "iceberg_source_snapshots_discovered_total",
92                "Total number of snapshots discovered via incremental scan",
93                &["source_id", "source_name", "table_name"],
94                registry
95            )
96            .unwrap();
97
98        let iceberg_source_list_duration_seconds = register_guarded_histogram_vec_with_registry!(
99            histogram_opts!(
100                "iceberg_source_list_duration_seconds",
101                "Time spent planning files from a snapshot",
102                exponential_buckets(0.01, 2.0, 15).unwrap() // 10ms to ~164s
103            ),
104            &["source_id", "source_name", "table_name"],
105            registry
106        )
107        .unwrap();
108
109        let iceberg_source_files_discovered_total =
110            register_guarded_int_counter_vec_with_registry!(
111                "iceberg_source_files_discovered_total",
112                "Total number of files discovered per scan",
113                &["source_id", "source_name", "table_name", "file_type"],
114                registry
115            )
116            .unwrap();
117
118        // Note: file-read metrics use ["table_name"] labels (matching iceberg_read_bytes)
119        // because the scan function doesn't have source-level context.
120        let iceberg_source_file_read_duration_seconds =
121            register_guarded_histogram_vec_with_registry!(
122                histogram_opts!(
123                    "iceberg_source_file_read_duration_seconds",
124                    "Per-file read duration",
125                    exponential_buckets(0.01, 2.0, 15).unwrap() // 10ms to ~164s
126                ),
127                &["table_name"],
128                registry
129            )
130            .unwrap();
131
132        let iceberg_source_rows_read_total = register_guarded_int_counter_vec_with_registry!(
133            "iceberg_source_rows_read_total",
134            "Total rows read from Iceberg source",
135            &["table_name"],
136            registry
137        )
138        .unwrap();
139
140        let iceberg_source_files_read_total = register_guarded_int_counter_vec_with_registry!(
141            "iceberg_source_files_read_total",
142            "Total files read from Iceberg source",
143            &["table_name", "file_type"],
144            registry
145        )
146        .unwrap();
147
148        let iceberg_source_delete_rows_applied_total =
149            register_guarded_int_counter_vec_with_registry!(
150                "iceberg_source_delete_rows_applied_total",
151                "Total rows removed by delete processing",
152                &["table_name", "delete_type"],
153                registry
154            )
155            .unwrap();
156
157        let iceberg_source_delete_files_per_data_file =
158            register_guarded_histogram_vec_with_registry!(
159                histogram_opts!(
160                    "iceberg_source_delete_files_per_data_file",
161                    "Number of delete files attached per data file scan task",
162                    // 1, 2, 4, 8, 16, 32, 64, 128
163                    exponential_buckets(1.0, 2.0, 8).unwrap()
164                ),
165                &["source_id", "source_name", "table_name"],
166                registry
167            )
168            .unwrap();
169
170        let iceberg_source_inflight_file_count = register_guarded_int_gauge_vec_with_registry!(
171            "iceberg_source_inflight_file_count",
172            "Number of files currently being fetched by the active reader",
173            &["source_id", "source_name", "table_name"],
174            registry
175        )
176        .unwrap();
177
178        let iceberg_source_scan_errors_total = register_guarded_int_counter_vec_with_registry!(
179            "iceberg_source_scan_errors_total",
180            "Total number of scan errors categorized by error type",
181            &["source_id", "source_name", "table_name", "error_type"],
182            registry
183        )
184        .unwrap();
185
186        Self {
187            iceberg_read_bytes,
188            iceberg_source_snapshot_lag_seconds,
189            iceberg_source_snapshots_discovered_total,
190            iceberg_source_list_duration_seconds,
191            iceberg_source_files_discovered_total,
192            iceberg_source_file_read_duration_seconds,
193            iceberg_source_rows_read_total,
194            iceberg_source_files_read_total,
195            iceberg_source_delete_rows_applied_total,
196            iceberg_source_delete_files_per_data_file,
197            iceberg_source_inflight_file_count,
198            iceberg_source_scan_errors_total,
199        }
200    }
201
202    pub fn for_test() -> Arc<Self> {
203        Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
204    }
205}
206
207pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
208    LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));