risingwave_connector/sink/iceberg/prometheus/
monitored_partition_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use iceberg::Result;
16use iceberg::spec::DataFile;
17use iceberg::writer::function_writer::fanout_partition_writer::{
18    FanoutPartitionWriter, FanoutPartitionWriterBuilder,
19};
20use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
21use risingwave_common::array::arrow::arrow_array_iceberg;
22use risingwave_common::metrics::LabelGuardedIntGauge;
23
24#[derive(Clone)]
25pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
26    inner: FanoutPartitionWriterBuilder<B>,
27    partition_num_metrics: LabelGuardedIntGauge<2>,
28}
29
30impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
31    #[expect(dead_code)]
32    pub fn new(
33        inner: FanoutPartitionWriterBuilder<B>,
34        partition_num: LabelGuardedIntGauge<2>,
35    ) -> Self {
36        Self {
37            inner,
38            partition_num_metrics: partition_num,
39        }
40    }
41}
42
43#[async_trait::async_trait]
44impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredFanoutPartitionedWriterBuilder<B> {
45    type R = MonitoredFanoutPartitionedWriter<B>;
46
47    async fn build(self) -> Result<Self::R> {
48        let writer = self.inner.build().await?;
49        Ok(MonitoredFanoutPartitionedWriter {
50            inner: writer,
51            partition_num_metrics: self.partition_num_metrics,
52            last_partition_num: 0,
53        })
54    }
55}
56
57pub struct MonitoredFanoutPartitionedWriter<B: IcebergWriterBuilder> {
58    inner: FanoutPartitionWriter<B>,
59    partition_num_metrics: LabelGuardedIntGauge<2>,
60    last_partition_num: usize,
61}
62
63impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriter<B> {
64    pub fn update_metrics(&mut self) {
65        let current_partition_num = self.inner.partition_num();
66        let delta = current_partition_num as i64 - self.last_partition_num as i64;
67        self.partition_num_metrics.add(delta);
68        self.last_partition_num = current_partition_num;
69    }
70}
71
72#[async_trait::async_trait]
73impl<B: IcebergWriterBuilder> IcebergWriter for MonitoredFanoutPartitionedWriter<B> {
74    async fn write(&mut self, batch: arrow_array_iceberg::RecordBatch) -> Result<()> {
75        self.inner.write(batch).await?;
76        self.update_metrics();
77        Ok(())
78    }
79
80    async fn close(&mut self) -> Result<Vec<DataFile>> {
81        self.update_metrics();
82        let res = self.inner.close().await?;
83        Ok(res)
84    }
85}