risingwave_connector/sink/iceberg/prometheus/
monitored_general_writer.rsuse iceberg::spec::DataFile;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::Result;
use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
#[derive(Clone)]
pub struct MonitoredGeneralWriterBuilder<B: IcebergWriterBuilder> {
inner: B,
write_qps: LabelGuardedIntCounter<3>,
write_latency: LabelGuardedHistogram<3>,
}
impl<B: IcebergWriterBuilder> MonitoredGeneralWriterBuilder<B> {
pub fn new(
inner: B,
write_qps: LabelGuardedIntCounter<3>,
write_latency: LabelGuardedHistogram<3>,
) -> Self {
Self {
inner,
write_qps,
write_latency,
}
}
}
#[async_trait::async_trait]
impl<B: IcebergWriterBuilder> IcebergWriterBuilder for MonitoredGeneralWriterBuilder<B> {
type R = MonitoredGeneralWriter<B::R>;
async fn build(self) -> Result<MonitoredGeneralWriter<B::R>> {
let inner = self.inner.build().await?;
Ok(MonitoredGeneralWriter {
inner,
write_qps: self.write_qps,
write_latency: self.write_latency,
})
}
}
pub struct MonitoredGeneralWriter<F: IcebergWriter> {
inner: F,
write_qps: LabelGuardedIntCounter<3>,
write_latency: LabelGuardedHistogram<3>,
}
#[async_trait::async_trait]
impl<F: IcebergWriter> IcebergWriter for MonitoredGeneralWriter<F> {
async fn write(&mut self, record: RecordBatch) -> Result<()> {
self.write_qps.inc();
let _timer = self.write_latency.start_timer();
self.inner.write(record).await
}
async fn close(&mut self) -> Result<Vec<DataFile>> {
self.inner.close().await
}
}