risingwave_connector/sink/
coordinate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::future::pending;
17use std::num::NonZeroU64;
18use std::time::Instant;
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_pb::connector_service::SinkMetadata;
25use tracing::{info, warn};
26
27use super::{
28    LogSinker, SinkCoordinationRpcClientEnum, SinkError, SinkLogReader, SinkWriterMetrics,
29    SinkWriterParam,
30};
31use crate::sink::writer::SinkWriter;
32use crate::sink::{LogStoreReadItem, Result, SinkParam, TruncateOffset};
33
34pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
35    writer: W,
36    sink_coordinate_client: SinkCoordinationRpcClientEnum,
37    param: SinkParam,
38    vnode_bitmap: Bitmap,
39    commit_checkpoint_interval: NonZeroU64,
40    sink_writer_metrics: SinkWriterMetrics,
41}
42
43impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
44    pub async fn new(
45        writer_param: &SinkWriterParam,
46        param: SinkParam,
47        writer: W,
48        commit_checkpoint_interval: NonZeroU64,
49    ) -> Result<Self> {
50        Ok(Self {
51            writer,
52            sink_coordinate_client: writer_param
53                .meta_client
54                .as_ref()
55                .ok_or_else(|| anyhow!("should have meta client"))?
56                .clone()
57                .sink_coordinate_client()
58                .await,
59            param,
60            vnode_bitmap: writer_param
61                .vnode_bitmap
62                .as_ref()
63                .ok_or_else(|| {
64                    anyhow!("sink needs coordination and should not have singleton input")
65                })?
66                .clone(),
67            commit_checkpoint_interval,
68            sink_writer_metrics: SinkWriterMetrics::new(writer_param),
69        })
70    }
71}
72
73#[async_trait]
74impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
75    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
76        let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
77            .sink_coordinate_client
78            .new_stream_handle(&self.param, self.vnode_bitmap)
79            .await?;
80        let mut sink_writer = self.writer;
81        log_reader.start_from(log_store_rewind_start_epoch).await?;
82        let mut first_item = log_reader.next_item().await?;
83        if let (Some(log_store_rewind_start_epoch), (first_epoch, _)) =
84            (log_store_rewind_start_epoch, &first_item)
85        {
86            if log_store_rewind_start_epoch >= *first_epoch {
87                bail!(
88                    "log_store_rewind_start_epoch {} not later than first_epoch {}",
89                    log_store_rewind_start_epoch,
90                    first_epoch
91                );
92            }
93        } else {
94            let &(initial_epoch, _) = &first_item;
95            let aligned_initial_epoch = coordinator_stream_handle
96                .align_initial_epoch(initial_epoch)
97                .await?;
98            if initial_epoch != aligned_initial_epoch {
99                warn!(
100                    initial_epoch,
101                    aligned_initial_epoch,
102                    sink_id = self.param.sink_id.sink_id,
103                    "initial epoch not matched aligned initial epoch"
104                );
105                let mut peeked_first = Some(first_item);
106                first_item = loop {
107                    let (epoch, item) = if let Some(peeked_first) = peeked_first.take() {
108                        peeked_first
109                    } else {
110                        log_reader.next_item().await?
111                    };
112                    match epoch.cmp(&aligned_initial_epoch) {
113                        Ordering::Less => {
114                            continue;
115                        }
116                        Ordering::Equal => {
117                            break (epoch, item);
118                        }
119                        Ordering::Greater => {
120                            return Err(anyhow!(
121                                "initial epoch {} greater than aligned initial epoch {}",
122                                initial_epoch,
123                                aligned_initial_epoch
124                            )
125                            .into());
126                        }
127                    }
128                };
129            }
130        }
131
132        let mut first_item = Some(first_item);
133
134        #[derive(Debug)]
135        enum LogConsumerState {
136            /// Mark that the log consumer is not initialized yet
137            Uninitialized,
138
139            /// Mark that a new epoch has begun.
140            EpochBegun { curr_epoch: u64 },
141
142            /// Mark that the consumer has just received a barrier
143            BarrierReceived { prev_epoch: u64 },
144        }
145
146        let mut state = LogConsumerState::Uninitialized;
147
148        let mut current_checkpoint: u64 = 0;
149        let commit_checkpoint_interval = self.commit_checkpoint_interval;
150        let sink_writer_metrics = self.sink_writer_metrics;
151
152        loop {
153            let (epoch, item) = if let Some(item) = first_item.take() {
154                item
155            } else {
156                log_reader.next_item().await?
157            };
158
159            // begin_epoch when not previously began
160            state = match state {
161                LogConsumerState::Uninitialized => {
162                    sink_writer.begin_epoch(epoch).await?;
163                    LogConsumerState::EpochBegun { curr_epoch: epoch }
164                }
165                LogConsumerState::EpochBegun { curr_epoch } => {
166                    assert!(
167                        epoch >= curr_epoch,
168                        "new epoch {} should not be below the current epoch {}",
169                        epoch,
170                        curr_epoch
171                    );
172                    LogConsumerState::EpochBegun { curr_epoch: epoch }
173                }
174                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
175                    assert!(
176                        epoch > prev_epoch,
177                        "new epoch {} should be greater than prev epoch {}",
178                        epoch,
179                        prev_epoch
180                    );
181
182                    sink_writer.begin_epoch(epoch).await?;
183                    LogConsumerState::EpochBegun { curr_epoch: epoch }
184                }
185            };
186            match item {
187                LogStoreReadItem::StreamChunk { chunk, .. } => {
188                    if let Err(e) = sink_writer.write_batch(chunk).await {
189                        sink_writer.abort().await?;
190                        return Err(e);
191                    }
192                }
193                LogStoreReadItem::Barrier {
194                    is_checkpoint,
195                    new_vnode_bitmap,
196                    is_stop,
197                } => {
198                    let prev_epoch = match state {
199                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
200                        _ => unreachable!("epoch must have begun before handling barrier"),
201                    };
202                    if is_checkpoint {
203                        current_checkpoint += 1;
204                        if current_checkpoint >= commit_checkpoint_interval.get()
205                            || new_vnode_bitmap.is_some()
206                            || is_stop
207                        {
208                            let start_time = Instant::now();
209                            let metadata = sink_writer.barrier(true).await?;
210                            let metadata = metadata.ok_or_else(|| {
211                                SinkError::Coordinator(anyhow!(
212                                    "should get metadata on checkpoint barrier"
213                                ))
214                            })?;
215                            coordinator_stream_handle.commit(epoch, metadata).await?;
216                            sink_writer_metrics
217                                .sink_commit_duration
218                                .observe(start_time.elapsed().as_millis() as f64);
219
220                            current_checkpoint = 0;
221                            if let Some(new_vnode_bitmap) = new_vnode_bitmap {
222                                let epoch = coordinator_stream_handle
223                                    .update_vnode_bitmap(&new_vnode_bitmap)
224                                    .await?;
225                                if epoch != prev_epoch {
226                                    bail!(
227                                        "newly start epoch {} after update vnode bitmap not matched with prev_epoch {}",
228                                        epoch,
229                                        prev_epoch
230                                    );
231                                }
232                            }
233                            if is_stop {
234                                coordinator_stream_handle.stop().await?;
235                                info!(
236                                    sink_id = self.param.sink_id.sink_id,
237                                    "coordinated log sinker stops"
238                                );
239                                log_reader.truncate(TruncateOffset::Barrier { epoch })?;
240                                return pending().await;
241                            }
242                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
243                        } else {
244                            let metadata = sink_writer.barrier(false).await?;
245                            if let Some(metadata) = metadata {
246                                warn!(?metadata, "get metadata on non-checkpoint barrier");
247                            }
248                        }
249                    } else {
250                        let metadata = sink_writer.barrier(false).await?;
251                        if let Some(metadata) = metadata {
252                            warn!(?metadata, "get metadata on non-checkpoint barrier");
253                        }
254                    }
255                    state = LogConsumerState::BarrierReceived { prev_epoch }
256                }
257            }
258        }
259    }
260}