risingwave_connector/sink/iceberg/prometheus/
monitored_general_writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use iceberg::Result;
16use iceberg::spec::DataFile;
17use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
18use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
19use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
20
21#[derive(Clone)]
22pub struct MonitoredGeneralWriterBuilder<B: IcebergWriterBuilder> {
23    inner: B,
24    write_qps: LabelGuardedIntCounter<3>,
25    write_latency: LabelGuardedHistogram<3>,
26}
27
28impl<B: IcebergWriterBuilder> MonitoredGeneralWriterBuilder<B> {
29    pub fn new(
30        inner: B,
31        write_qps: LabelGuardedIntCounter<3>,
32        write_latency: LabelGuardedHistogram<3>,
33    ) -> Self {
34        Self {
35            inner,
36            write_qps,
37            write_latency,
38        }
39    }
40}
41
42#[async_trait::async_trait]
43impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredGeneralWriterBuilder<B> {
44    type R = MonitoredGeneralWriter<B::R>;
45
46    async fn build(self) -> Result<MonitoredGeneralWriter<B::R>> {
47        let inner = self.inner.build().await?;
48        Ok(MonitoredGeneralWriter {
49            inner,
50            write_qps: self.write_qps,
51            write_latency: self.write_latency,
52        })
53    }
54}
55
56pub struct MonitoredGeneralWriter<F: IcebergWriter> {
57    inner: F,
58    write_qps: LabelGuardedIntCounter<3>,
59    write_latency: LabelGuardedHistogram<3>,
60}
61
62#[async_trait::async_trait]
63impl<F: IcebergWriter> IcebergWriter for MonitoredGeneralWriter<F> {
64    async fn write(&mut self, record: RecordBatch) -> Result<()> {
65        self.write_qps.inc();
66        let _timer = self.write_latency.start_timer();
67        self.inner.write(record).await
68    }
69
70    async fn close(&mut self) -> Result<Vec<DataFile>> {
71        self.inner.close().await
72    }
73}