risingwave_connector/sink/
coordinate.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28    LogSinker, SinkCoordinationRpcClientEnum, SinkLogReader, SinkWriterMetrics, SinkWriterParam,
29};
30use crate::sink::writer::SinkWriter;
31use crate::sink::{LogStoreReadItem, Result, SinkError, SinkParam, TruncateOffset};
32
33pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
34    writer: W,
35    sink_coordinate_client: SinkCoordinationRpcClientEnum,
36    param: SinkParam,
37    vnode_bitmap: Bitmap,
38    commit_checkpoint_interval: NonZeroU64,
39    sink_writer_metrics: SinkWriterMetrics,
40}
41
42impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
43    pub async fn new(
44        writer_param: &SinkWriterParam,
45        param: SinkParam,
46        writer: W,
47        commit_checkpoint_interval: NonZeroU64,
48    ) -> Result<Self> {
49        Ok(Self {
50            writer,
51            sink_coordinate_client: writer_param
52                .meta_client
53                .as_ref()
54                .ok_or_else(|| anyhow!("should have meta client"))?
55                .clone()
56                .sink_coordinate_client()
57                .await,
58            param,
59            vnode_bitmap: writer_param
60                .vnode_bitmap
61                .as_ref()
62                .ok_or_else(|| {
63                    anyhow!("sink needs coordination and should not have singleton input")
64                })?
65                .clone(),
66            commit_checkpoint_interval,
67            sink_writer_metrics: SinkWriterMetrics::new(writer_param),
68        })
69    }
70}
71
72fn should_commit_on_checkpoint_barrier(
73    current_checkpoint: u64,
74    commit_checkpoint_interval: NonZeroU64,
75    writer_requires_commit: bool,
76    vnode_bitmap_updated: bool,
77    is_stop: bool,
78    has_schema_change: bool,
79) -> bool {
80    current_checkpoint >= commit_checkpoint_interval.get()
81        || writer_requires_commit
82        || vnode_bitmap_updated
83        || is_stop
84        || has_schema_change
85}
86
87#[async_trait]
88impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
89    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
90        let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
91            .sink_coordinate_client
92            .new_stream_handle(&self.param, self.vnode_bitmap)
93            .await?;
94        let mut sink_writer = self.writer;
95        log_reader.start_from(log_store_rewind_start_epoch).await?;
96        let mut first_item = log_reader.next_item().await?;
97        if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
98            (log_store_rewind_start_epoch, &first_item)
99        {
100            if log_store_rewind_start_epoch >= *first_epoch {
101                bail!(
102                    "log_store_rewind_start_epoch {} not later than first_epoch {}",
103                    log_store_rewind_start_epoch,
104                    first_epoch
105                );
106            }
107        } else {
108            let &(initial_epoch, _) = &first_item;
109            let aligned_initial_epoch = coordinator_stream_handle
110                .align_initial_epoch(initial_epoch)
111                .await?;
112            if initial_epoch != aligned_initial_epoch {
113                warn!(
114                    initial_epoch,
115                    aligned_initial_epoch,
116                    sink_id = %self.param.sink_id,
117                    "initial epoch not matched aligned initial epoch"
118                );
119                let mut peeked_first = Some(first_item);
120                first_item = loop {
121                    let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
122                        peeked_first
123                    } else {
124                        log_reader.next_item().await?
125                    };
126                    match epoch.cmp(&aligned_initial_epoch) {
127                        Ordering::Less => {
128                            continue;
129                        }
130                        Ordering::Equal => {
131                            break (epoch, item);
132                        }
133                        Ordering::Greater => {
134                            return Err(anyhow!(
135                                "initial epoch {} greater than aligned initial epoch {}",
136                                initial_epoch,
137                                aligned_initial_epoch
138                            )
139                            .into());
140                        }
141                    }
142                };
143            }
144        }
145
146        let mut first_item = Some(first_item);
147
148        #[derive(Debug)]
149        enum LogConsumerState {
150            /// Mark that the log consumer is not initialized yet
151            Uninitialized,
152
153            /// Mark that a new epoch has begun.
154            EpochBegun { curr_epoch: u64 },
155
156            /// Mark that the consumer has just received a barrier
157            BarrierReceived { prev_epoch: u64 },
158        }
159
160        let mut state = LogConsumerState::Uninitialized;
161
162        let mut current_checkpoint: u64 = 0;
163        let commit_checkpoint_interval = self.commit_checkpoint_interval;
164        let sink_writer_metrics = self.sink_writer_metrics;
165
166        loop {
167            let (epoch, item) = if let Some(item) = first_item.take() {
168                item
169            } else {
170                log_reader.next_item().await?
171            };
172
173            // begin_epoch when not previously began
174            state = match state {
175                LogConsumerState::Uninitialized => {
176                    sink_writer.begin_epoch(epoch).await?;
177                    LogConsumerState::EpochBegun { curr_epoch: epoch }
178                }
179                LogConsumerState::EpochBegun { curr_epoch } => {
180                    assert!(
181                        epoch >= curr_epoch,
182                        "new epoch {} should not be below the current epoch {}",
183                        epoch,
184                        curr_epoch
185                    );
186                    LogConsumerState::EpochBegun { curr_epoch: epoch }
187                }
188                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
189                    assert!(
190                        epoch > prev_epoch,
191                        "new epoch {} should be greater than prev epoch {}",
192                        epoch,
193                        prev_epoch
194                    );
195
196                    sink_writer.begin_epoch(epoch).await?;
197                    LogConsumerState::EpochBegun { curr_epoch: epoch }
198                }
199            };
200            match item {
201                LogStoreReadItem::StreamChunk { chunk, .. } => {
202                    if let Err(e) = sink_writer.write_batch(chunk).await {
203                        sink_writer.abort().await?;
204                        return Err(e);
205                    }
206                }
207                LogStoreReadItem::Barrier {
208                    is_checkpoint,
209                    new_vnode_bitmap,
210                    is_stop,
211                    schema_change,
212                } => {
213                    let prev_epoch = match state {
214                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
215                        _ => unreachable!("epoch must have begun before handling barrier"),
216                    };
217                    if is_checkpoint {
218                        current_checkpoint += 1;
219                        if should_commit_on_checkpoint_barrier(
220                            current_checkpoint,
221                            commit_checkpoint_interval,
222                            sink_writer.should_commit_on_checkpoint(),
223                            new_vnode_bitmap.is_some(),
224                            is_stop,
225                            schema_change.is_some(),
226                        ) {
227                            let start_time = Instant::now();
228                            let metadata = sink_writer.barrier(true).await?;
229                            let metadata = metadata.ok_or_else(|| {
230                                SinkError::Coordinator(anyhow!(
231                                    "should get metadata on checkpoint barrier"
232                                ))
233                            })?;
234                            if schema_change.is_some() {
235                                tracing::info!(
236                                    sink_id = %self.param.sink_id,
237                                    ?schema_change,
238                                    "schema change received for coordinated log sinker"
239                                );
240                                assert!(
241                                    is_stop,
242                                    "schema change should stop current sink for sink {}",
243                                    self.param.sink_id
244                                );
245                            }
246                            coordinator_stream_handle
247                                .commit(epoch, metadata, schema_change)
248                                .await?;
249                            sink_writer_metrics
250                                .sink_commit_duration
251                                .observe(start_time.elapsed().as_secs_f64());
252
253                            current_checkpoint = 0;
254                            if let Some(new_vnode_bitmap) = new_vnode_bitmap {
255                                let epoch = coordinator_stream_handle
256                                    .update_vnode_bitmap(&new_vnode_bitmap)
257                                    .await?;
258                                if epoch != prev_epoch {
259                                    bail!(
260                                        "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
261                                        epoch,
262                                        prev_epoch
263                                    );
264                                }
265                            }
266                            if is_stop {
267                                coordinator_stream_handle.stop().await?;
268                                info!(
269                                    sink_id = %self.param.sink_id,
270                                    "coordinated log sinker stops"
271                                );
272                                log_reader.truncate(TruncateOffset::Barrier { epoch })?;
273                                return pending().await;
274                            }
275                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
276                        } else {
277                            let metadata = sink_writer.barrier(false).await?;
278                            if let Some(metadata) = metadata {
279                                warn!(?metadata, "get metadata on non-checkpoint barrier");
280                            }
281                        }
282                    } else {
283                        let metadata = sink_writer.barrier(false).await?;
284                        if let Some(metadata) = metadata {
285                            warn!(?metadata, "get metadata on non-checkpoint barrier");
286                        }
287                    }
288                    state = LogConsumerState::BarrierReceived { prev_epoch }
289                }
290            }
291        }
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use std::num::NonZeroU64;
298
299    use super::should_commit_on_checkpoint_barrier;
300
301    #[test]
302    fn test_should_commit_on_checkpoint_barrier_for_interval() {
303        assert!(should_commit_on_checkpoint_barrier(
304            3,
305            NonZeroU64::new(3).unwrap(),
306            false,
307            false,
308            false,
309            false,
310        ));
311        assert!(!should_commit_on_checkpoint_barrier(
312            2,
313            NonZeroU64::new(3).unwrap(),
314            false,
315            false,
316            false,
317            false,
318        ));
319    }
320
321    #[test]
322    fn test_should_commit_on_checkpoint_barrier_for_writer_request() {
323        assert!(should_commit_on_checkpoint_barrier(
324            1,
325            NonZeroU64::new(60).unwrap(),
326            true,
327            false,
328            false,
329            false,
330        ));
331    }
332
333    #[test]
334    fn test_should_commit_on_checkpoint_barrier_for_forced_events() {
335        assert!(should_commit_on_checkpoint_barrier(
336            1,
337            NonZeroU64::new(60).unwrap(),
338            false,
339            true,
340            false,
341            false,
342        ));
343        assert!(should_commit_on_checkpoint_barrier(
344            1,
345            NonZeroU64::new(60).unwrap(),
346            false,
347            false,
348            true,
349            false,
350        ));
351        assert!(should_commit_on_checkpoint_barrier(
352            1,
353            NonZeroU64::new(60).unwrap(),
354            false,
355            false,
356            false,
357            true,
358        ));
359    }
360}