risingwave_connector/sink/iceberg/prometheus/
monitored_position_delete_writer.rs1use iceberg::Result;
16use iceberg::spec::DataFile;
17use iceberg::writer::base_writer::sort_position_delete_writer::{
18 PositionDeleteInput, SortPositionDeleteWriter, SortPositionDeleteWriterBuilder,
19};
20use iceberg::writer::file_writer::FileWriterBuilder;
21use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
22use risingwave_common::metrics::LabelGuardedIntGauge;
23
24#[derive(Clone)]
25pub struct MonitoredPositionDeleteWriterBuilder<B: FileWriterBuilder> {
26 cache_row_metrics: LabelGuardedIntGauge<3>,
27 inner: SortPositionDeleteWriterBuilder<B>,
28}
29
30impl<B: FileWriterBuilder> MonitoredPositionDeleteWriterBuilder<B> {
31 pub fn new(
32 inner: SortPositionDeleteWriterBuilder<B>,
33 cache_row_metrics: LabelGuardedIntGauge<3>,
34 ) -> Self {
35 Self {
36 cache_row_metrics,
37 inner,
38 }
39 }
40}
41
42#[async_trait::async_trait]
43impl<B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput>
44 for MonitoredPositionDeleteWriterBuilder<B>
45{
46 type R = MonitoredPositionDeleteWriter<B>;
47
48 async fn build(self) -> Result<Self::R> {
49 let writer = self.inner.build().await?;
50 Ok(MonitoredPositionDeleteWriter {
51 writer,
52 cache_row_metrics: self.cache_row_metrics,
53 last_cache_row: 0,
54 })
55 }
56}
57
58pub struct MonitoredPositionDeleteWriter<B: FileWriterBuilder> {
59 writer: SortPositionDeleteWriter<B>,
60
61 cache_row_metrics: LabelGuardedIntGauge<3>,
63 last_cache_row: usize,
64}
65
66impl<B: FileWriterBuilder> MonitoredPositionDeleteWriter<B> {
67 fn update_metrics(&mut self) {
68 self.cache_row_metrics
69 .add(self.writer.current_cache_number() as i64 - self.last_cache_row as i64);
70 self.last_cache_row = self.writer.current_cache_number();
71 }
72}
73
74#[async_trait::async_trait]
75impl<B: FileWriterBuilder> IcebergWriter<PositionDeleteInput> for MonitoredPositionDeleteWriter<B> {
76 async fn write(&mut self, input: PositionDeleteInput) -> Result<()> {
77 self.writer.write(input).await?;
78 self.update_metrics();
79 Ok(())
80 }
81
82 async fn close(&mut self) -> Result<Vec<DataFile>> {
83 self.update_metrics();
84 let res = self.writer.close().await?;
85 Ok(res)
86 }
87}