risingwave_connector/sink/iceberg/prometheus/
monitored_general_writer.rs1use iceberg::Result;
16use iceberg::spec::{DataFile, PartitionKey};
17use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
18use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
19use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
20
21#[derive(Clone)]
22pub struct MonitoredGeneralWriterBuilder<B: IcebergWriterBuilder> {
23 inner: B,
24 write_qps: LabelGuardedIntCounter,
25 write_latency: LabelGuardedHistogram,
26}
27
28impl<B: IcebergWriterBuilder> MonitoredGeneralWriterBuilder<B> {
29 pub fn new(
30 inner: B,
31 write_qps: LabelGuardedIntCounter,
32 write_latency: LabelGuardedHistogram,
33 ) -> Self {
34 Self {
35 inner,
36 write_qps,
37 write_latency,
38 }
39 }
40}
41
42#[async_trait::async_trait]
43impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredGeneralWriterBuilder<B> {
44 type R = MonitoredGeneralWriter<B::R>;
45
46 async fn build(
47 self,
48 partition_key: Option<PartitionKey>,
49 ) -> Result<MonitoredGeneralWriter<B::R>> {
50 let inner = self.inner.build(partition_key).await?;
51 Ok(MonitoredGeneralWriter {
52 inner,
53 write_qps: self.write_qps,
54 write_latency: self.write_latency,
55 })
56 }
57}
58
59pub struct MonitoredGeneralWriter<F: IcebergWriter> {
60 inner: F,
61 write_qps: LabelGuardedIntCounter,
62 write_latency: LabelGuardedHistogram,
63}
64
65#[async_trait::async_trait]
66impl<F: IcebergWriter> IcebergWriter for MonitoredGeneralWriter<F> {
67 async fn write(&mut self, record: RecordBatch) -> Result<()> {
68 self.write_qps.inc();
69 let _timer = self.write_latency.start_timer();
70 self.inner.write(record).await
71 }
72
73 async fn close(&mut self) -> Result<Vec<DataFile>> {
74 self.inner.close().await
75 }
76}