risingwave_connector/sink/
coordinate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28    LogSinker, SinkCoordinationRpcClientEnum, SinkLogReader, SinkWriterMetrics, SinkWriterParam,
29};
30use crate::sink::writer::SinkWriter;
31use crate::sink::{LogStoreReadItem, Result, SinkError, SinkParam, TruncateOffset};
32
33pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
34    writer: W,
35    sink_coordinate_client: SinkCoordinationRpcClientEnum,
36    param: SinkParam,
37    vnode_bitmap: Bitmap,
38    commit_checkpoint_interval: NonZeroU64,
39    sink_writer_metrics: SinkWriterMetrics,
40}
41
42impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
43    pub async fn new(
44        writer_param: &SinkWriterParam,
45        param: SinkParam,
46        writer: W,
47        commit_checkpoint_interval: NonZeroU64,
48    ) -> Result<Self> {
49        Ok(Self {
50            writer,
51            sink_coordinate_client: writer_param
52                .meta_client
53                .as_ref()
54                .ok_or_else(|| anyhow!("should have meta client"))?
55                .clone()
56                .sink_coordinate_client()
57                .await,
58            param,
59            vnode_bitmap: writer_param
60                .vnode_bitmap
61                .as_ref()
62                .ok_or_else(|| {
63                    anyhow!("sink needs coordination and should not have singleton input")
64                })?
65                .clone(),
66            commit_checkpoint_interval,
67            sink_writer_metrics: SinkWriterMetrics::new(writer_param),
68        })
69    }
70}
71
72#[async_trait]
73impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
74    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
75        let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
76            .sink_coordinate_client
77            .new_stream_handle(&self.param, self.vnode_bitmap)
78            .await?;
79        let mut sink_writer = self.writer;
80        log_reader.start_from(log_store_rewind_start_epoch).await?;
81        let mut first_item = log_reader.next_item().await?;
82        if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
83            (log_store_rewind_start_epoch, &first_item)
84        {
85            if log_store_rewind_start_epoch >= *first_epoch {
86                bail!(
87                    "log_store_rewind_start_epoch {} not later than first_epoch {}",
88                    log_store_rewind_start_epoch,
89                    first_epoch
90                );
91            }
92        } else {
93            let &(initial_epoch, _) = &first_item;
94            let aligned_initial_epoch = coordinator_stream_handle
95                .align_initial_epoch(initial_epoch)
96                .await?;
97            if initial_epoch != aligned_initial_epoch {
98                warn!(
99                    initial_epoch,
100                    aligned_initial_epoch,
101                    sink_id = self.param.sink_id.sink_id,
102                    "initial epoch not matched aligned initial epoch"
103                );
104                let mut peeked_first = Some(first_item);
105                first_item = loop {
106                    let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
107                        peeked_first
108                    } else {
109                        log_reader.next_item().await?
110                    };
111                    match epoch.cmp(&aligned_initial_epoch) {
112                        Ordering::Less => {
113                            continue;
114                        }
115                        Ordering::Equal => {
116                            break (epoch, item);
117                        }
118                        Ordering::Greater => {
119                            return Err(anyhow!(
120                                "initial epoch {} greater than aligned initial epoch {}",
121                                initial_epoch,
122                                aligned_initial_epoch
123                            )
124                            .into());
125                        }
126                    }
127                };
128            }
129        }
130
131        let mut first_item = Some(first_item);
132
133        #[derive(Debug)]
134        enum LogConsumerState {
135            /// Mark that the log consumer is not initialized yet
136            Uninitialized,
137
138            /// Mark that a new epoch has begun.
139            EpochBegun { curr_epoch: u64 },
140
141            /// Mark that the consumer has just received a barrier
142            BarrierReceived { prev_epoch: u64 },
143        }
144
145        let mut state = LogConsumerState::Uninitialized;
146
147        let mut current_checkpoint: u64 = 0;
148        let commit_checkpoint_interval = self.commit_checkpoint_interval;
149        let sink_writer_metrics = self.sink_writer_metrics;
150
151        loop {
152            let (epoch, item) = if let Some(item) = first_item.take() {
153                item
154            } else {
155                log_reader.next_item().await?
156            };
157
158            // begin_epoch when not previously began
159            state = match state {
160                LogConsumerState::Uninitialized => {
161                    sink_writer.begin_epoch(epoch).await?;
162                    LogConsumerState::EpochBegun { curr_epoch: epoch }
163                }
164                LogConsumerState::EpochBegun { curr_epoch } => {
165                    assert!(
166                        epoch >= curr_epoch,
167                        "new epoch {} should not be below the current epoch {}",
168                        epoch,
169                        curr_epoch
170                    );
171                    LogConsumerState::EpochBegun { curr_epoch: epoch }
172                }
173                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
174                    assert!(
175                        epoch > prev_epoch,
176                        "new epoch {} should be greater than prev epoch {}",
177                        epoch,
178                        prev_epoch
179                    );
180
181                    sink_writer.begin_epoch(epoch).await?;
182                    LogConsumerState::EpochBegun { curr_epoch: epoch }
183                }
184            };
185            match item {
186                LogStoreReadItem::StreamChunk { chunk, .. } => {
187                    if let Err(e) = sink_writer.write_batch(chunk).await {
188                        sink_writer.abort().await?;
189                        return Err(e);
190                    }
191                }
192                LogStoreReadItem::Barrier {
193                    is_checkpoint,
194                    new_vnode_bitmap,
195                    is_stop,
196                    add_columns,
197                } => {
198                    let prev_epoch = match state {
199                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
200                        _ => unreachable!("epoch must have begun before handling barrier"),
201                    };
202                    if is_checkpoint {
203                        current_checkpoint += 1;
204                        if current_checkpoint >= commit_checkpoint_interval.get()
205                            || new_vnode_bitmap.is_some()
206                            || is_stop
207                            || add_columns.is_some()
208                        {
209                            let start_time = Instant::now();
210                            let metadata = sink_writer.barrier(true).await?;
211                            let metadata = metadata.ok_or_else(|| {
212                                SinkError::Coordinator(anyhow!(
213                                    "should get metadata on checkpoint barrier"
214                                ))
215                            })?;
216                            if add_columns.is_some() {
217                                assert!(
218                                    is_stop,
219                                    "add columns should stop current sink for sink {}",
220                                    self.param.sink_id
221                                );
222                            }
223                            coordinator_stream_handle
224                                .commit(epoch, metadata, add_columns)
225                                .await?;
226                            sink_writer_metrics
227                                .sink_commit_duration
228                                .observe(start_time.elapsed().as_millis() as f64);
229
230                            current_checkpoint = 0;
231                            if let Some(new_vnode_bitmap) = new_vnode_bitmap {
232                                let epoch = coordinator_stream_handle
233                                    .update_vnode_bitmap(&new_vnode_bitmap)
234                                    .await?;
235                                if epoch != prev_epoch {
236                                    bail!(
237                                        "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
238                                        epoch,
239                                        prev_epoch
240                                    );
241                                }
242                            }
243                            if is_stop {
244                                coordinator_stream_handle.stop().await?;
245                                info!(
246                                    sink_id = self.param.sink_id.sink_id,
247                                    "coordinated log sinker stops"
248                                );
249                                log_reader.truncate(TruncateOffset::Barrier { epoch })?;
250                                return pending().await;
251                            }
252                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
253                        } else {
254                            let metadata = sink_writer.barrier(false).await?;
255                            if let Some(metadata) = metadata {
256                                warn!(?metadata, "get metadata on non-checkpoint barrier");
257                            }
258                        }
259                    } else {
260                        let metadata = sink_writer.barrier(false).await?;
261                        if let Some(metadata) = metadata {
262                            warn!(?metadata, "get metadata on non-checkpoint barrier");
263                        }
264                    }
265                    state = LogConsumerState::BarrierReceived { prev_epoch }
266                }
267            }
268        }
269    }
270}