risingwave_connector/sink/
coordinate.rs1use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28 LogSinker, SinkCoordinationRpcClientEnum, SinkLogReader, SinkWriterMetrics, SinkWriterParam,
29};
30use crate::sink::writer::SinkWriter;
31use crate::sink::{LogStoreReadItem, Result, SinkError, SinkParam, TruncateOffset};
32
33pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
34 writer: W,
35 sink_coordinate_client: SinkCoordinationRpcClientEnum,
36 param: SinkParam,
37 vnode_bitmap: Bitmap,
38 commit_checkpoint_interval: NonZeroU64,
39 sink_writer_metrics: SinkWriterMetrics,
40}
41
42impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
43 pub async fn new(
44 writer_param: &SinkWriterParam,
45 param: SinkParam,
46 writer: W,
47 commit_checkpoint_interval: NonZeroU64,
48 ) -> Result<Self> {
49 Ok(Self {
50 writer,
51 sink_coordinate_client: writer_param
52 .meta_client
53 .as_ref()
54 .ok_or_else(|| anyhow!("should have meta client"))?
55 .clone()
56 .sink_coordinate_client()
57 .await,
58 param,
59 vnode_bitmap: writer_param
60 .vnode_bitmap
61 .as_ref()
62 .ok_or_else(|| {
63 anyhow!("sink needs coordination and should not have singleton input")
64 })?
65 .clone(),
66 commit_checkpoint_interval,
67 sink_writer_metrics: SinkWriterMetrics::new(writer_param),
68 })
69 }
70}
71
72fn should_commit_on_checkpoint_barrier(
73 current_checkpoint: u64,
74 commit_checkpoint_interval: NonZeroU64,
75 vnode_bitmap_updated: bool,
76 is_stop: bool,
77 has_schema_change: bool,
78) -> bool {
79 current_checkpoint >= commit_checkpoint_interval.get()
80 || vnode_bitmap_updated
81 || is_stop
82 || has_schema_change
83}
84
85#[async_trait]
86impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
87 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
88 let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
89 .sink_coordinate_client
90 .new_stream_handle(&self.param, self.vnode_bitmap)
91 .await?;
92 let mut sink_writer = self.writer;
93 log_reader.start_from(log_store_rewind_start_epoch).await?;
94 let mut first_item = log_reader.next_item().await?;
95 if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
96 (log_store_rewind_start_epoch, &first_item)
97 {
98 if log_store_rewind_start_epoch >= *first_epoch {
99 bail!(
100 "log_store_rewind_start_epoch {} not later than first_epoch {}",
101 log_store_rewind_start_epoch,
102 first_epoch
103 );
104 }
105 } else {
106 let &(initial_epoch, _) = &first_item;
107 let aligned_initial_epoch = coordinator_stream_handle
108 .align_initial_epoch(initial_epoch)
109 .await?;
110 if initial_epoch != aligned_initial_epoch {
111 warn!(
112 initial_epoch,
113 aligned_initial_epoch,
114 sink_id = %self.param.sink_id,
115 "initial epoch not matched aligned initial epoch"
116 );
117 let mut peeked_first = Some(first_item);
118 first_item = loop {
119 let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
120 peeked_first
121 } else {
122 log_reader.next_item().await?
123 };
124 match epoch.cmp(&aligned_initial_epoch) {
125 Ordering::Less => {
126 continue;
127 }
128 Ordering::Equal => {
129 break (epoch, item);
130 }
131 Ordering::Greater => {
132 return Err(anyhow!(
133 "initial epoch {} greater than aligned initial epoch {}",
134 initial_epoch,
135 aligned_initial_epoch
136 )
137 .into());
138 }
139 }
140 };
141 }
142 }
143
144 let mut first_item = Some(first_item);
145
146 #[derive(Debug)]
147 enum LogConsumerState {
148 Uninitialized,
150
151 EpochBegun { curr_epoch: u64 },
153
154 BarrierReceived { prev_epoch: u64 },
156 }
157
158 let mut state = LogConsumerState::Uninitialized;
159
160 let mut current_checkpoint: u64 = 0;
161 let commit_checkpoint_interval = self.commit_checkpoint_interval;
162 let sink_writer_metrics = self.sink_writer_metrics;
163
164 loop {
165 let (epoch, item) = if let Some(item) = first_item.take() {
166 item
167 } else {
168 log_reader.next_item().await?
169 };
170
171 state = match state {
173 LogConsumerState::Uninitialized => {
174 sink_writer.begin_epoch(epoch).await?;
175 LogConsumerState::EpochBegun { curr_epoch: epoch }
176 }
177 LogConsumerState::EpochBegun { curr_epoch } => {
178 assert!(
179 epoch >= curr_epoch,
180 "new epoch {} should not be below the current epoch {}",
181 epoch,
182 curr_epoch
183 );
184 LogConsumerState::EpochBegun { curr_epoch: epoch }
185 }
186 LogConsumerState::BarrierReceived { prev_epoch, .. } => {
187 assert!(
188 epoch > prev_epoch,
189 "new epoch {} should be greater than prev epoch {}",
190 epoch,
191 prev_epoch
192 );
193
194 sink_writer.begin_epoch(epoch).await?;
195 LogConsumerState::EpochBegun { curr_epoch: epoch }
196 }
197 };
198 match item {
199 LogStoreReadItem::StreamChunk { chunk, .. } => {
200 if let Err(e) = sink_writer.write_batch(chunk).await {
201 sink_writer.abort().await?;
202 return Err(e);
203 }
204 }
205 LogStoreReadItem::Barrier {
206 is_checkpoint,
207 new_vnode_bitmap,
208 is_stop,
209 schema_change,
210 } => {
211 let prev_epoch = match state {
212 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
213 _ => unreachable!("epoch must have begun before handling barrier"),
214 };
215 if is_checkpoint {
216 current_checkpoint += 1;
217 if should_commit_on_checkpoint_barrier(
218 current_checkpoint,
219 commit_checkpoint_interval,
220 new_vnode_bitmap.is_some(),
221 is_stop,
222 schema_change.is_some(),
223 ) {
224 let start_time = Instant::now();
225 let metadata = sink_writer.barrier(true).await?;
226 let metadata = metadata.ok_or_else(|| {
227 SinkError::Coordinator(anyhow!(
228 "should get metadata on checkpoint barrier"
229 ))
230 })?;
231 if schema_change.is_some() {
232 tracing::info!(
233 sink_id = %self.param.sink_id,
234 ?schema_change,
235 "schema change received for coordinated log sinker"
236 );
237 assert!(
238 is_stop,
239 "schema change should stop current sink for sink {}",
240 self.param.sink_id
241 );
242 }
243 coordinator_stream_handle
244 .commit(epoch, metadata, schema_change)
245 .await?;
246 sink_writer_metrics
247 .sink_commit_duration
248 .observe(start_time.elapsed().as_secs_f64());
249
250 current_checkpoint = 0;
251 if let Some(new_vnode_bitmap) = new_vnode_bitmap {
252 let epoch = coordinator_stream_handle
253 .update_vnode_bitmap(&new_vnode_bitmap)
254 .await?;
255 if epoch != prev_epoch {
256 bail!(
257 "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
258 epoch,
259 prev_epoch
260 );
261 }
262 }
263 if is_stop {
264 coordinator_stream_handle.stop().await?;
265 info!(
266 sink_id = %self.param.sink_id,
267 "coordinated log sinker stops"
268 );
269 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
270 return pending().await;
271 }
272 log_reader.truncate(TruncateOffset::Barrier { epoch })?;
273 } else {
274 let metadata = sink_writer.barrier(false).await?;
275 if let Some(metadata) = metadata {
276 warn!(?metadata, "get metadata on non-checkpoint barrier");
277 }
278 }
279 } else {
280 let metadata = sink_writer.barrier(false).await?;
281 if let Some(metadata) = metadata {
282 warn!(?metadata, "get metadata on non-checkpoint barrier");
283 }
284 }
285 state = LogConsumerState::BarrierReceived { prev_epoch }
286 }
287 }
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use std::num::NonZeroU64;
295
296 use super::should_commit_on_checkpoint_barrier;
297
298 #[test]
299 fn test_should_commit_on_checkpoint_barrier_for_interval() {
300 assert!(should_commit_on_checkpoint_barrier(
301 3,
302 NonZeroU64::new(3).unwrap(),
303 false,
304 false,
305 false,
306 ));
307 assert!(!should_commit_on_checkpoint_barrier(
308 2,
309 NonZeroU64::new(3).unwrap(),
310 false,
311 false,
312 false,
313 ));
314 }
315
316 #[test]
317 fn test_should_commit_on_checkpoint_barrier_for_forced_events() {
318 assert!(should_commit_on_checkpoint_barrier(
319 1,
320 NonZeroU64::new(60).unwrap(),
321 true,
322 false,
323 false,
324 ));
325 assert!(should_commit_on_checkpoint_barrier(
326 1,
327 NonZeroU64::new(60).unwrap(),
328 false,
329 true,
330 false,
331 ));
332 assert!(should_commit_on_checkpoint_barrier(
333 1,
334 NonZeroU64::new(60).unwrap(),
335 false,
336 false,
337 true,
338 ));
339 }
340}