risingwave_connector/sink/
coordinate.rs1use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28 LogSinker, SinkCoordinationRpcClientEnum, SinkLogReader, SinkWriterMetrics, SinkWriterParam,
29};
30use crate::sink::writer::SinkWriter;
31use crate::sink::{LogStoreReadItem, Result, SinkError, SinkParam, TruncateOffset};
32
33pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
34 writer: W,
35 sink_coordinate_client: SinkCoordinationRpcClientEnum,
36 param: SinkParam,
37 vnode_bitmap: Bitmap,
38 commit_checkpoint_interval: NonZeroU64,
39 sink_writer_metrics: SinkWriterMetrics,
40}
41
42impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
43 pub async fn new(
44 writer_param: &SinkWriterParam,
45 param: SinkParam,
46 writer: W,
47 commit_checkpoint_interval: NonZeroU64,
48 ) -> Result<Self> {
49 Ok(Self {
50 writer,
51 sink_coordinate_client: writer_param
52 .meta_client
53 .as_ref()
54 .ok_or_else(|| anyhow!("should have meta client"))?
55 .clone()
56 .sink_coordinate_client()
57 .await,
58 param,
59 vnode_bitmap: writer_param
60 .vnode_bitmap
61 .as_ref()
62 .ok_or_else(|| {
63 anyhow!("sink needs coordination and should not have singleton input")
64 })?
65 .clone(),
66 commit_checkpoint_interval,
67 sink_writer_metrics: SinkWriterMetrics::new(writer_param),
68 })
69 }
70}
71
72fn should_commit_on_checkpoint_barrier(
73 current_checkpoint: u64,
74 commit_checkpoint_interval: NonZeroU64,
75 writer_requires_commit: bool,
76 vnode_bitmap_updated: bool,
77 is_stop: bool,
78 has_schema_change: bool,
79) -> bool {
80 current_checkpoint >= commit_checkpoint_interval.get()
81 || writer_requires_commit
82 || vnode_bitmap_updated
83 || is_stop
84 || has_schema_change
85}
86
87#[async_trait]
88impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
89 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
90 let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
91 .sink_coordinate_client
92 .new_stream_handle(&self.param, self.vnode_bitmap)
93 .await?;
94 let mut sink_writer = self.writer;
95 log_reader.start_from(log_store_rewind_start_epoch).await?;
96 let mut first_item = log_reader.next_item().await?;
97 if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
98 (log_store_rewind_start_epoch, &first_item)
99 {
100 if log_store_rewind_start_epoch >= *first_epoch {
101 bail!(
102 "log_store_rewind_start_epoch {} not later than first_epoch {}",
103 log_store_rewind_start_epoch,
104 first_epoch
105 );
106 }
107 } else {
108 let &(initial_epoch, _) = &first_item;
109 let aligned_initial_epoch = coordinator_stream_handle
110 .align_initial_epoch(initial_epoch)
111 .await?;
112 if initial_epoch != aligned_initial_epoch {
113 warn!(
114 initial_epoch,
115 aligned_initial_epoch,
116 sink_id = %self.param.sink_id,
117 "initial epoch not matched aligned initial epoch"
118 );
119 let mut peeked_first = Some(first_item);
120 first_item = loop {
121 let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
122 peeked_first
123 } else {
124 log_reader.next_item().await?
125 };
126 match epoch.cmp(&aligned_initial_epoch) {
127 Ordering::Less => {
128 continue;
129 }
130 Ordering::Equal => {
131 break (epoch, item);
132 }
133 Ordering::Greater => {
134 return Err(anyhow!(
135 "initial epoch {} greater than aligned initial epoch {}",
136 initial_epoch,
137 aligned_initial_epoch
138 )
139 .into());
140 }
141 }
142 };
143 }
144 }
145
146 let mut first_item = Some(first_item);
147
148 #[derive(Debug)]
149 enum LogConsumerState {
150 Uninitialized,
152
153 EpochBegun { curr_epoch: u64 },
155
156 BarrierReceived { prev_epoch: u64 },
158 }
159
160 let mut state = LogConsumerState::Uninitialized;
161
162 let mut current_checkpoint: u64 = 0;
163 let commit_checkpoint_interval = self.commit_checkpoint_interval;
164 let sink_writer_metrics = self.sink_writer_metrics;
165
166 loop {
167 let (epoch, item) = if let Some(item) = first_item.take() {
168 item
169 } else {
170 log_reader.next_item().await?
171 };
172
173 state = match state {
175 LogConsumerState::Uninitialized => {
176 sink_writer.begin_epoch(epoch).await?;
177 LogConsumerState::EpochBegun { curr_epoch: epoch }
178 }
179 LogConsumerState::EpochBegun { curr_epoch } => {
180 assert!(
181 epoch >= curr_epoch,
182 "new epoch {} should not be below the current epoch {}",
183 epoch,
184 curr_epoch
185 );
186 LogConsumerState::EpochBegun { curr_epoch: epoch }
187 }
188 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
189 assert!(
190 epoch > prev_epoch,
191 "new epoch {} should be greater than prev epoch {}",
192 epoch,
193 prev_epoch
194 );
195
196 sink_writer.begin_epoch(epoch).await?;
197 LogConsumerState::EpochBegun { curr_epoch: epoch }
198 }
199 };
200 match item {
201 LogStoreReadItem::StreamChunk { chunk, .. } => {
202 if let Err(e) = sink_writer.write_batch(chunk).await {
203 sink_writer.abort().await?;
204 return Err(e);
205 }
206 }
207 LogStoreReadItem::Barrier {
208 is_checkpoint,
209 new_vnode_bitmap,
210 is_stop,
211 schema_change,
212 } => {
213 let prev_epoch = match state {
214 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
215 _ => unreachable!("epoch must have begun before handling barrier"),
216 };
217 if is_checkpoint {
218 current_checkpoint += 1;
219 if should_commit_on_checkpoint_barrier(
220 current_checkpoint,
221 commit_checkpoint_interval,
222 sink_writer.should_commit_on_checkpoint(),
223 new_vnode_bitmap.is_some(),
224 is_stop,
225 schema_change.is_some(),
226 ) {
227 let start_time = Instant::now();
228 let metadata = sink_writer.barrier(true).await?;
229 let metadata = metadata.ok_or_else(|| {
230 SinkError::Coordinator(anyhow!(
231 "should get metadata on checkpoint barrier"
232 ))
233 })?;
234 if schema_change.is_some() {
235 tracing::info!(
236 sink_id = %self.param.sink_id,
237 ?schema_change,
238 "schema change received for coordinated log sinker"
239 );
240 assert!(
241 is_stop,
242 "schema change should stop current sink for sink {}",
243 self.param.sink_id
244 );
245 }
246 coordinator_stream_handle
247 .commit(epoch, metadata, schema_change)
248 .await?;
249 sink_writer_metrics
250 .sink_commit_duration
251 .observe(start_time.elapsed().as_secs_f64());
252
253 current_checkpoint = 0;
254 if let Some(new_vnode_bitmap) = new_vnode_bitmap {
255 let epoch = coordinator_stream_handle
256 .update_vnode_bitmap(&new_vnode_bitmap)
257 .await?;
258 if epoch != prev_epoch {
259 bail!(
260 "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
261 epoch,
262 prev_epoch
263 );
264 }
265 }
266 if is_stop {
267 coordinator_stream_handle.stop().await?;
268 info!(
269 sink_id = %self.param.sink_id,
270 "coordinated log sinker stops"
271 );
272 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
273 return pending().await;
274 }
275 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
276 } else {
277 let metadata = sink_writer.barrier(false).await?;
278 if let Some(metadata) = metadata {
279 warn!(?metadata, "get metadata on non-checkpoint barrier");
280 }
281 }
282 } else {
283 let metadata = sink_writer.barrier(false).await?;
284 if let Some(metadata) = metadata {
285 warn!(?metadata, "get metadata on non-checkpoint barrier");
286 }
287 }
288 state = LogConsumerState::BarrierReceived { prev_epoch }
289 }
290 }
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use std::num::NonZeroU64;
298
299 use super::should_commit_on_checkpoint_barrier;
300
301 #[test]
302 fn test_should_commit_on_checkpoint_barrier_for_interval() {
303 assert!(should_commit_on_checkpoint_barrier(
304 3,
305 NonZeroU64::new(3).unwrap(),
306 false,
307 false,
308 false,
309 false,
310 ));
311 assert!(!should_commit_on_checkpoint_barrier(
312 2,
313 NonZeroU64::new(3).unwrap(),
314 false,
315 false,
316 false,
317 false,
318 ));
319 }
320
321 #[test]
322 fn test_should_commit_on_checkpoint_barrier_for_writer_request() {
323 assert!(should_commit_on_checkpoint_barrier(
324 1,
325 NonZeroU64::new(60).unwrap(),
326 true,
327 false,
328 false,
329 false,
330 ));
331 }
332
333 #[test]
334 fn test_should_commit_on_checkpoint_barrier_for_forced_events() {
335 assert!(should_commit_on_checkpoint_barrier(
336 1,
337 NonZeroU64::new(60).unwrap(),
338 false,
339 true,
340 false,
341 false,
342 ));
343 assert!(should_commit_on_checkpoint_barrier(
344 1,
345 NonZeroU64::new(60).unwrap(),
346 false,
347 false,
348 true,
349 false,
350 ));
351 assert!(should_commit_on_checkpoint_barrier(
352 1,
353 NonZeroU64::new(60).unwrap(),
354 false,
355 false,
356 false,
357 true,
358 ));
359 }
360}