risingwave_connector/sink/iceberg/prometheus/
monitored_write_writer.rsuse async_trait::async_trait;
use icelake::io_v2::{IcebergWriter, IcebergWriterBuilder};
use icelake::Result;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::array::arrow::arrow_schema_iceberg::SchemaRef;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
#[derive(Clone)]
pub struct MonitoredWriteWriterBuilder<B: IcebergWriterBuilder> {
inner: B,
write_qps: LabelGuardedIntCounter<2>,
write_latency: LabelGuardedHistogram<2>,
}
impl<B: IcebergWriterBuilder> MonitoredWriteWriterBuilder<B> {
#[expect(dead_code)]
pub fn new(
inner: B,
write_qps: LabelGuardedIntCounter<2>,
write_latency: LabelGuardedHistogram<2>,
) -> Self {
Self {
inner,
write_qps,
write_latency,
}
}
}
#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredWriteWriterBuilder<B> {
type R = MonitoredWriteWriter<B::R>;
async fn build(self, schema: &SchemaRef) -> Result<Self::R> {
let appender = self.inner.build(schema).await?;
Ok(MonitoredWriteWriter {
appender,
write_qps: self.write_qps,
write_latency: self.write_latency,
})
}
}
pub struct MonitoredWriteWriter<F: IcebergWriter> {
appender: F,
write_qps: LabelGuardedIntCounter<2>,
write_latency: LabelGuardedHistogram<2>,
}
#[async_trait]
impl<F: IcebergWriter> IcebergWriter for MonitoredWriteWriter<F> {
type R = F::R;
async fn write(&mut self, record: RecordBatch) -> Result<()> {
self.write_qps.inc();
let _timer = self.write_latency.start_timer();
self.appender.write(record).await
}
async fn flush(&mut self) -> Result<Vec<Self::R>> {
self.appender.flush().await
}
}