risingwave_connector/sink/iceberg/prometheus/
monitored_partition_writer.rsuse iceberg::spec::DataFile;
use iceberg::writer::function_writer::fanout_partition_writer::{
FanoutPartitionWriter, FanoutPartitionWriterBuilder,
};
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::Result;
use risingwave_common::array::arrow::arrow_array_iceberg;
use risingwave_common::metrics::LabelGuardedIntGauge;
#[derive(Clone)]
pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
inner: FanoutPartitionWriterBuilder<B>,
partition_num_metrics: LabelGuardedIntGauge<2>,
}
impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
#[expect(dead_code)]
pub fn new(
inner: FanoutPartitionWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
) -> Self {
Self {
inner,
partition_num_metrics: partition_num,
}
}
}
#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredFanoutPartitionedWriterBuilder<B> {
type R = MonitoredFanoutPartitionedWriter<B>;
async fn build(self) -> Result<Self::R> {
let writer = self.inner.build().await?;
Ok(MonitoredFanoutPartitionedWriter {
inner: writer,
partition_num_metrics: self.partition_num_metrics,
last_partition_num: 0,
})
}
}
pub struct MonitoredFanoutPartitionedWriter<B: IcebergWriterBuilder> {
inner: FanoutPartitionWriter<B>,
partition_num_metrics: LabelGuardedIntGauge<2>,
last_partition_num: usize,
}
impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriter<B> {
pub fn update_metrics(&mut self) {
let current_partition_num = self.inner.partition_num();
let delta = current_partition_num as i64 - self.last_partition_num as i64;
self.partition_num_metrics.add(delta);
self.last_partition_num = current_partition_num;
}
}
#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriter for MonitoredFanoutPartitionedWriter<B> {
async fn write(&mut self, batch: arrow_array_iceberg::RecordBatch) -> Result<()> {
self.inner.write(batch).await?;
self.update_metrics();
Ok(())
}
async fn close(&mut self) -> Result<Vec<DataFile>> {
self.update_metrics();
let res = self.inner.close().await?;
Ok(res)
}
}