risingwave_connector/sink/iceberg/prometheus/
monitored_partition_writer.rs1use iceberg::Result;
16use iceberg::spec::DataFile;
17use iceberg::writer::function_writer::fanout_partition_writer::{
18 FanoutPartitionWriter, FanoutPartitionWriterBuilder,
19};
20use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
21use risingwave_common::array::arrow::arrow_array_iceberg;
22use risingwave_common::metrics::LabelGuardedIntGauge;
23
24#[derive(Clone)]
25pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
26 inner: FanoutPartitionWriterBuilder<B>,
27 partition_num_metrics: LabelGuardedIntGauge<2>,
28}
29
30impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
31 #[expect(dead_code)]
32 pub fn new(
33 inner: FanoutPartitionWriterBuilder<B>,
34 partition_num: LabelGuardedIntGauge<2>,
35 ) -> Self {
36 Self {
37 inner,
38 partition_num_metrics: partition_num,
39 }
40 }
41}
42
43#[async_trait::async_trait]
44impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredFanoutPartitionedWriterBuilder<B> {
45 type R = MonitoredFanoutPartitionedWriter<B>;
46
47 async fn build(self) -> Result<Self::R> {
48 let writer = self.inner.build().await?;
49 Ok(MonitoredFanoutPartitionedWriter {
50 inner: writer,
51 partition_num_metrics: self.partition_num_metrics,
52 last_partition_num: 0,
53 })
54 }
55}
56
57pub struct MonitoredFanoutPartitionedWriter<B: IcebergWriterBuilder> {
58 inner: FanoutPartitionWriter<B>,
59 partition_num_metrics: LabelGuardedIntGauge<2>,
60 last_partition_num: usize,
61}
62
63impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriter<B> {
64 pub fn update_metrics(&mut self) {
65 let current_partition_num = self.inner.partition_num();
66 let delta = current_partition_num as i64 - self.last_partition_num as i64;
67 self.partition_num_metrics.add(delta);
68 self.last_partition_num = current_partition_num;
69 }
70}
71
72#[async_trait::async_trait]
73impl<B: IcebergWriterBuilder> IcebergWriter for MonitoredFanoutPartitionedWriter<B> {
74 async fn write(&mut self, batch: arrow_array_iceberg::RecordBatch) -> Result<()> {
75 self.inner.write(batch).await?;
76 self.update_metrics();
77 Ok(())
78 }
79
80 async fn close(&mut self) -> Result<Vec<DataFile>> {
81 self.update_metrics();
82 let res = self.inner.close().await?;
83 Ok(res)
84 }
85}