risingwave_connector/sink/iceberg/prometheus/
monitored_general_writer.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use iceberg::Result;
18use iceberg::spec::{DataFile, PartitionKey};
19use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
20use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
21use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
22
23#[derive(Clone)]
24pub struct MonitoredGeneralWriterBuilder<B: IcebergWriterBuilder> {
25    inner: Arc<B>,
26    write_qps: LabelGuardedIntCounter,
27    write_latency: LabelGuardedHistogram,
28}
29
30impl<B: IcebergWriterBuilder> MonitoredGeneralWriterBuilder<B> {
31    pub fn new(
32        inner: B,
33        write_qps: LabelGuardedIntCounter,
34        write_latency: LabelGuardedHistogram,
35    ) -> Self {
36        Self {
37            inner: Arc::new(inner),
38            write_qps,
39            write_latency,
40        }
41    }
42}
43
44#[async_trait::async_trait]
45impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredGeneralWriterBuilder<B> {
46    type R = MonitoredGeneralWriter<B::R>;
47
48    async fn build(
49        &self,
50        partition_key: Option<PartitionKey>,
51    ) -> Result<MonitoredGeneralWriter<B::R>> {
52        let inner = self.inner.build(partition_key).await?;
53        Ok(MonitoredGeneralWriter {
54            inner,
55            write_qps: self.write_qps.clone(),
56            write_latency: self.write_latency.clone(),
57        })
58    }
59}
60
61pub struct MonitoredGeneralWriter<F: IcebergWriter> {
62    inner: F,
63    write_qps: LabelGuardedIntCounter,
64    write_latency: LabelGuardedHistogram,
65}
66
67#[async_trait::async_trait]
68impl<F: IcebergWriter> IcebergWriter for MonitoredGeneralWriter<F> {
69    async fn write(&mut self, record: RecordBatch) -> Result<()> {
70        self.write_qps.inc();
71        let _timer = self.write_latency.start_timer();
72        self.inner.write(record).await
73    }
74
75    async fn close(&mut self) -> Result<Vec<DataFile>> {
76        self.inner.close().await
77    }
78}