risingwave_connector/sink/iceberg/prometheus/
monitored_general_writer.rs1use iceberg::Result;
16use iceberg::spec::DataFile;
17use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
18use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
19use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
20
21#[derive(Clone)]
22pub struct MonitoredGeneralWriterBuilder<B: IcebergWriterBuilder> {
23 inner: B,
24 write_qps: LabelGuardedIntCounter<3>,
25 write_latency: LabelGuardedHistogram<3>,
26}
27
28impl<B: IcebergWriterBuilder> MonitoredGeneralWriterBuilder<B> {
29 pub fn new(
30 inner: B,
31 write_qps: LabelGuardedIntCounter<3>,
32 write_latency: LabelGuardedHistogram<3>,
33 ) -> Self {
34 Self {
35 inner,
36 write_qps,
37 write_latency,
38 }
39 }
40}
41
42#[async_trait::async_trait]
43impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredGeneralWriterBuilder<B> {
44 type R = MonitoredGeneralWriter<B::R>;
45
46 async fn build(self) -> Result<MonitoredGeneralWriter<B::R>> {
47 let inner = self.inner.build().await?;
48 Ok(MonitoredGeneralWriter {
49 inner,
50 write_qps: self.write_qps,
51 write_latency: self.write_latency,
52 })
53 }
54}
55
56pub struct MonitoredGeneralWriter<F: IcebergWriter> {
57 inner: F,
58 write_qps: LabelGuardedIntCounter<3>,
59 write_latency: LabelGuardedHistogram<3>,
60}
61
62#[async_trait::async_trait]
63impl<F: IcebergWriter> IcebergWriter for MonitoredGeneralWriter<F> {
64 async fn write(&mut self, record: RecordBatch) -> Result<()> {
65 self.write_qps.inc();
66 let _timer = self.write_latency.start_timer();
67 self.inner.write(record).await
68 }
69
70 async fn close(&mut self) -> Result<Vec<DataFile>> {
71 self.inner.close().await
72 }
73}