risingwave_connector/sink/iceberg/prometheus/
monitored_position_delete_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use iceberg::Result;
16use iceberg::spec::DataFile;
17use iceberg::writer::base_writer::sort_position_delete_writer::{
18    PositionDeleteInput, SortPositionDeleteWriter, SortPositionDeleteWriterBuilder,
19};
20use iceberg::writer::file_writer::FileWriterBuilder;
21use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
22use risingwave_common::metrics::LabelGuardedIntGauge;
23
24#[derive(Clone)]
25pub struct MonitoredPositionDeleteWriterBuilder<B: FileWriterBuilder> {
26    cache_row_metrics: LabelGuardedIntGauge<3>,
27    inner: SortPositionDeleteWriterBuilder<B>,
28}
29
30impl<B: FileWriterBuilder> MonitoredPositionDeleteWriterBuilder<B> {
31    pub fn new(
32        inner: SortPositionDeleteWriterBuilder<B>,
33        cache_row_metrics: LabelGuardedIntGauge<3>,
34    ) -> Self {
35        Self {
36            cache_row_metrics,
37            inner,
38        }
39    }
40}
41
42#[async_trait::async_trait]
43impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput>
44    for MonitoredPositionDeleteWriterBuilder<B>
45{
46    type R = MonitoredPositionDeleteWriter<B>;
47
48    async fn build(self) -> Result<Self::R> {
49        let writer = self.inner.build().await?;
50        Ok(MonitoredPositionDeleteWriter {
51            writer,
52            cache_row_metrics: self.cache_row_metrics,
53            last_cache_row: 0,
54        })
55    }
56}
57
58pub struct MonitoredPositionDeleteWriter<B: FileWriterBuilder> {
59    writer: SortPositionDeleteWriter<B>,
60
61    // metrics
62    cache_row_metrics: LabelGuardedIntGauge<3>,
63    last_cache_row: usize,
64}
65
66impl<B: FileWriterBuilder> MonitoredPositionDeleteWriter<B> {
67    fn update_metrics(&mut self) {
68        self.cache_row_metrics
69            .add(self.writer.current_cache_number() as i64 - self.last_cache_row as i64);
70        self.last_cache_row = self.writer.current_cache_number();
71    }
72}
73
74#[async_trait::async_trait]
75impl<B: FileWriterBuilder> IcebergWriter<PositionDeleteInput> for MonitoredPositionDeleteWriter<B> {
76    async fn write(&mut self, input: PositionDeleteInput) -> Result<()> {
77        self.writer.write(input).await?;
78        self.update_metrics();
79        Ok(())
80    }
81
82    async fn close(&mut self) -> Result<Vec<DataFile>> {
83        self.update_metrics();
84        let res = self.writer.close().await?;
85        Ok(res)
86    }
87}