risingwave_connector/sink/
decouple_checkpoint_log_sink.rs1use std::num::NonZeroU64;
16use std::time::Instant;
17
18use async_trait::async_trait;
19
20use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
21use crate::sink::writer::SinkWriter;
22use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics};
23
24pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
25pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
26
27pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 60;
28pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";
29
30pub fn default_commit_checkpoint_interval() -> u64 {
31 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
32}
33
34pub fn iceberg_default_commit_checkpoint_interval() -> u64 {
35 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL
36}
37
38pub struct DecoupleCheckpointLogSinkerOf<W> {
42 writer: W,
43 sink_writer_metrics: SinkWriterMetrics,
44 commit_checkpoint_interval: NonZeroU64,
45}
46
47impl<W> DecoupleCheckpointLogSinkerOf<W> {
48 pub fn new(
51 writer: W,
52 sink_writer_metrics: SinkWriterMetrics,
53 commit_checkpoint_interval: NonZeroU64,
54 ) -> Self {
55 DecoupleCheckpointLogSinkerOf {
56 writer,
57 sink_writer_metrics,
58 commit_checkpoint_interval,
59 }
60 }
61}
62
63#[async_trait]
64impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
65 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
66 let mut sink_writer = self.writer;
67 log_reader.start_from(None).await?;
68 #[derive(Debug)]
69 enum LogConsumerState {
70 Uninitialized,
72
73 EpochBegun { curr_epoch: u64 },
75
76 BarrierReceived { prev_epoch: u64 },
78 }
79
80 let mut state = LogConsumerState::Uninitialized;
81
82 let mut current_checkpoint: u64 = 0;
83 let commit_checkpoint_interval = self.commit_checkpoint_interval;
84 let sink_writer_metrics = self.sink_writer_metrics;
85
86 loop {
87 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
88 state = match state {
90 LogConsumerState::Uninitialized => {
91 sink_writer.begin_epoch(epoch).await?;
92 LogConsumerState::EpochBegun { curr_epoch: epoch }
93 }
94 LogConsumerState::EpochBegun { curr_epoch } => {
95 assert!(
96 epoch >= curr_epoch,
97 "new epoch {} should not be below the current epoch {}",
98 epoch,
99 curr_epoch
100 );
101 LogConsumerState::EpochBegun { curr_epoch: epoch }
102 }
103 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
104 assert!(
105 epoch > prev_epoch,
106 "new epoch {} should be greater than prev epoch {}",
107 epoch,
108 prev_epoch
109 );
110
111 sink_writer.begin_epoch(epoch).await?;
112 LogConsumerState::EpochBegun { curr_epoch: epoch }
113 }
114 };
115 match item {
116 LogStoreReadItem::StreamChunk { chunk, .. } => {
117 if let Err(e) = sink_writer.write_batch(chunk).await {
118 sink_writer.abort().await?;
119 return Err(e);
120 }
121 }
122 LogStoreReadItem::Barrier {
123 is_checkpoint,
124 new_vnode_bitmap,
125 ..
126 } => {
127 let prev_epoch = match state {
128 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
129 _ => unreachable!("epoch must have begun before handling barrier"),
130 };
131 if is_checkpoint {
132 current_checkpoint += 1;
133 if current_checkpoint >= commit_checkpoint_interval.get()
134 || new_vnode_bitmap.is_some()
135 {
136 let start_time = Instant::now();
137 sink_writer.barrier(true).await?;
138 sink_writer_metrics
139 .sink_commit_duration
140 .observe(start_time.elapsed().as_millis() as f64);
141 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
142
143 current_checkpoint = 0;
144 } else {
145 sink_writer.barrier(false).await?;
146 }
147 } else {
148 assert!(new_vnode_bitmap.is_none());
149 sink_writer.barrier(false).await?;
150 }
151 state = LogConsumerState::BarrierReceived { prev_epoch }
152 }
153 }
154 }
155 }
156}