risingwave_connector/sink/iceberg/prometheus/
monitored_general_writer.rs1use std::sync::Arc;
16
17use iceberg::Result;
18use iceberg::spec::{DataFile, PartitionKey};
19use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
20use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
21use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
22
23#[derive(Clone)]
24pub struct MonitoredGeneralWriterBuilder<B: IcebergWriterBuilder> {
25 inner: Arc<B>,
26 write_qps: LabelGuardedIntCounter,
27 write_latency: LabelGuardedHistogram,
28}
29
30impl<B: IcebergWriterBuilder> MonitoredGeneralWriterBuilder<B> {
31 pub fn new(
32 inner: B,
33 write_qps: LabelGuardedIntCounter,
34 write_latency: LabelGuardedHistogram,
35 ) -> Self {
36 Self {
37 inner: Arc::new(inner),
38 write_qps,
39 write_latency,
40 }
41 }
42}
43
44#[async_trait::async_trait]
45impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredGeneralWriterBuilder<B> {
46 type R = MonitoredGeneralWriter<B::R>;
47
48 async fn build(
49 &self,
50 partition_key: Option<PartitionKey>,
51 ) -> Result<MonitoredGeneralWriter<B::R>> {
52 let inner = self.inner.build(partition_key).await?;
53 Ok(MonitoredGeneralWriter {
54 inner,
55 write_qps: self.write_qps.clone(),
56 write_latency: self.write_latency.clone(),
57 })
58 }
59}
60
61pub struct MonitoredGeneralWriter<F: IcebergWriter> {
62 inner: F,
63 write_qps: LabelGuardedIntCounter,
64 write_latency: LabelGuardedHistogram,
65}
66
67#[async_trait::async_trait]
68impl<F: IcebergWriter> IcebergWriter for MonitoredGeneralWriter<F> {
69 async fn write(&mut self, record: RecordBatch) -> Result<()> {
70 self.write_qps.inc();
71 let _timer = self.write_latency.start_timer();
72 self.inner.write(record).await
73 }
74
75 async fn close(&mut self) -> Result<Vec<DataFile>> {
76 self.inner.close().await
77 }
78}