risingwave_connector/sink/
coordinate.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28    LogSinker, SinkCoordinationRpcClientEnum, SinkLogReader, SinkWriterMetrics, SinkWriterParam,
29};
30use crate::sink::writer::SinkWriter;
31use crate::sink::{LogStoreReadItem, Result, SinkError, SinkParam, TruncateOffset};
32
33pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
34    writer: W,
35    sink_coordinate_client: SinkCoordinationRpcClientEnum,
36    param: SinkParam,
37    vnode_bitmap: Bitmap,
38    commit_checkpoint_interval: NonZeroU64,
39    sink_writer_metrics: SinkWriterMetrics,
40}
41
42impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
43    pub async fn new(
44        writer_param: &SinkWriterParam,
45        param: SinkParam,
46        writer: W,
47        commit_checkpoint_interval: NonZeroU64,
48    ) -> Result<Self> {
49        Ok(Self {
50            writer,
51            sink_coordinate_client: writer_param
52                .meta_client
53                .as_ref()
54                .ok_or_else(|| anyhow!("should have meta client"))?
55                .clone()
56                .sink_coordinate_client()
57                .await,
58            param,
59            vnode_bitmap: writer_param
60                .vnode_bitmap
61                .as_ref()
62                .ok_or_else(|| {
63                    anyhow!("sink needs coordination and should not have singleton input")
64                })?
65                .clone(),
66            commit_checkpoint_interval,
67            sink_writer_metrics: SinkWriterMetrics::new(writer_param),
68        })
69    }
70}
71
72fn should_commit_on_checkpoint_barrier(
73    current_checkpoint: u64,
74    commit_checkpoint_interval: NonZeroU64,
75    vnode_bitmap_updated: bool,
76    is_stop: bool,
77    has_schema_change: bool,
78) -> bool {
79    current_checkpoint >= commit_checkpoint_interval.get()
80        || vnode_bitmap_updated
81        || is_stop
82        || has_schema_change
83}
84
85#[async_trait]
86impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
87    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
88        let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
89            .sink_coordinate_client
90            .new_stream_handle(&self.param, self.vnode_bitmap)
91            .await?;
92        let mut sink_writer = self.writer;
93        log_reader.start_from(log_store_rewind_start_epoch).await?;
94        let mut first_item = log_reader.next_item().await?;
95        if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
96            (log_store_rewind_start_epoch, &first_item)
97        {
98            if log_store_rewind_start_epoch >= *first_epoch {
99                bail!(
100                    "log_store_rewind_start_epoch {} not later than first_epoch {}",
101                    log_store_rewind_start_epoch,
102                    first_epoch
103                );
104            }
105        } else {
106            let &(initial_epoch, _) = &first_item;
107            let aligned_initial_epoch = coordinator_stream_handle
108                .align_initial_epoch(initial_epoch)
109                .await?;
110            if initial_epoch != aligned_initial_epoch {
111                warn!(
112                    initial_epoch,
113                    aligned_initial_epoch,
114                    sink_id = %self.param.sink_id,
115                    "initial epoch not matched aligned initial epoch"
116                );
117                let mut peeked_first = Some(first_item);
118                first_item = loop {
119                    let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
120                        peeked_first
121                    } else {
122                        log_reader.next_item().await?
123                    };
124                    match epoch.cmp(&aligned_initial_epoch) {
125                        Ordering::Less => {
126                            continue;
127                        }
128                        Ordering::Equal => {
129                            break (epoch, item);
130                        }
131                        Ordering::Greater => {
132                            return Err(anyhow!(
133                                "initial epoch {} greater than aligned initial epoch {}",
134                                initial_epoch,
135                                aligned_initial_epoch
136                            )
137                            .into());
138                        }
139                    }
140                };
141            }
142        }
143
144        let mut first_item = Some(first_item);
145
146        #[derive(Debug)]
147        enum LogConsumerState {
148            /// Mark that the log consumer is not initialized yet
149            Uninitialized,
150
151            /// Mark that a new epoch has begun.
152            EpochBegun { curr_epoch: u64 },
153
154            /// Mark that the consumer has just received a barrier
155            BarrierReceived { prev_epoch: u64 },
156        }
157
158        let mut state = LogConsumerState::Uninitialized;
159
160        let mut current_checkpoint: u64 = 0;
161        let commit_checkpoint_interval = self.commit_checkpoint_interval;
162        let sink_writer_metrics = self.sink_writer_metrics;
163
164        loop {
165            let (epoch, item) = if let Some(item) = first_item.take() {
166                item
167            } else {
168                log_reader.next_item().await?
169            };
170
171            // begin_epoch when not previously began
172            state = match state {
173                LogConsumerState::Uninitialized => {
174                    sink_writer.begin_epoch(epoch).await?;
175                    LogConsumerState::EpochBegun { curr_epoch: epoch }
176                }
177                LogConsumerState::EpochBegun { curr_epoch } => {
178                    assert!(
179                        epoch >= curr_epoch,
180                        "new epoch {} should not be below the current epoch {}",
181                        epoch,
182                        curr_epoch
183                    );
184                    LogConsumerState::EpochBegun { curr_epoch: epoch }
185                }
186                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
187                    assert!(
188                        epoch > prev_epoch,
189                        "new epoch {} should be greater than prev epoch {}",
190                        epoch,
191                        prev_epoch
192                    );
193
194                    sink_writer.begin_epoch(epoch).await?;
195                    LogConsumerState::EpochBegun { curr_epoch: epoch }
196                }
197            };
198            match item {
199                LogStoreReadItem::StreamChunk { chunk, .. } => {
200                    if let Err(e) = sink_writer.write_batch(chunk).await {
201                        sink_writer.abort().await?;
202                        return Err(e);
203                    }
204                }
205                LogStoreReadItem::Barrier {
206                    is_checkpoint,
207                    new_vnode_bitmap,
208                    is_stop,
209                    schema_change,
210                } => {
211                    let prev_epoch = match state {
212                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
213                        _ => unreachable!("epoch must have begun before handling barrier"),
214                    };
215                    if is_checkpoint {
216                        current_checkpoint += 1;
217                        if should_commit_on_checkpoint_barrier(
218                            current_checkpoint,
219                            commit_checkpoint_interval,
220                            new_vnode_bitmap.is_some(),
221                            is_stop,
222                            schema_change.is_some(),
223                        ) {
224                            let start_time = Instant::now();
225                            let metadata = sink_writer.barrier(true).await?;
226                            let metadata = metadata.ok_or_else(|| {
227                                SinkError::Coordinator(anyhow!(
228                                    "should get metadata on checkpoint barrier"
229                                ))
230                            })?;
231                            if schema_change.is_some() {
232                                tracing::info!(
233                                    sink_id = %self.param.sink_id,
234                                    ?schema_change,
235                                    "schema change received for coordinated log sinker"
236                                );
237                                assert!(
238                                    is_stop,
239                                    "schema change should stop current sink for sink {}",
240                                    self.param.sink_id
241                                );
242                            }
243                            coordinator_stream_handle
244                                .commit(epoch, metadata, schema_change)
245                                .await?;
246                            sink_writer_metrics
247                                .sink_commit_duration
248                                .observe(start_time.elapsed().as_secs_f64());
249
250                            current_checkpoint = 0;
251                            if let Some(new_vnode_bitmap) = new_vnode_bitmap {
252                                let epoch = coordinator_stream_handle
253                                    .update_vnode_bitmap(&new_vnode_bitmap)
254                                    .await?;
255                                if epoch != prev_epoch {
256                                    bail!(
257                                        "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
258                                        epoch,
259                                        prev_epoch
260                                    );
261                                }
262                            }
263                            if is_stop {
264                                coordinator_stream_handle.stop().await?;
265                                info!(
266                                    sink_id = %self.param.sink_id,
267                                    "coordinated log sinker stops"
268                                );
269                                log_reader.truncate(TruncateOffset::Barrier { epoch })?;
270                                return pending().await;
271                            }
272                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
273                        } else {
274                            let metadata = sink_writer.barrier(false).await?;
275                            if let Some(metadata) = metadata {
276                                warn!(?metadata, "get metadata on non-checkpoint barrier");
277                            }
278                        }
279                    } else {
280                        let metadata = sink_writer.barrier(false).await?;
281                        if let Some(metadata) = metadata {
282                            warn!(?metadata, "get metadata on non-checkpoint barrier");
283                        }
284                    }
285                    state = LogConsumerState::BarrierReceived { prev_epoch }
286                }
287            }
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use std::num::NonZeroU64;
295
296    use super::should_commit_on_checkpoint_barrier;
297
298    #[test]
299    fn test_should_commit_on_checkpoint_barrier_for_interval() {
300        assert!(should_commit_on_checkpoint_barrier(
301            3,
302            NonZeroU64::new(3).unwrap(),
303            false,
304            false,
305            false,
306        ));
307        assert!(!should_commit_on_checkpoint_barrier(
308            2,
309            NonZeroU64::new(3).unwrap(),
310            false,
311            false,
312            false,
313        ));
314    }
315
316    #[test]
317    fn test_should_commit_on_checkpoint_barrier_for_forced_events() {
318        assert!(should_commit_on_checkpoint_barrier(
319            1,
320            NonZeroU64::new(60).unwrap(),
321            true,
322            false,
323            false,
324        ));
325        assert!(should_commit_on_checkpoint_barrier(
326            1,
327            NonZeroU64::new(60).unwrap(),
328            false,
329            true,
330            false,
331        ));
332        assert!(should_commit_on_checkpoint_barrier(
333            1,
334            NonZeroU64::new(60).unwrap(),
335            false,
336            false,
337            true,
338        ));
339    }
340}