risingwave_connector/sink/
coordinate.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::pending;
16use std::num::NonZeroU64;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use async_trait::async_trait;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_pb::connector_service::SinkMetadata;
23use tracing::{info, warn};
24
25use super::{
26    LogSinker, SinkCoordinationRpcClientEnum, SinkError, SinkLogReader, SinkWriterMetrics,
27    SinkWriterParam,
28};
29use crate::sink::writer::SinkWriter;
30use crate::sink::{LogStoreReadItem, Result, SinkParam, TruncateOffset};
31
32pub struct CoordinatedLogSinker<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
33    writer: W,
34    sink_coordinate_client: SinkCoordinationRpcClientEnum,
35    param: SinkParam,
36    vnode_bitmap: Bitmap,
37    commit_checkpoint_interval: NonZeroU64,
38    sink_writer_metrics: SinkWriterMetrics,
39}
40
41impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedLogSinker<W> {
42    pub async fn new(
43        writer_param: &SinkWriterParam,
44        param: SinkParam,
45        writer: W,
46        commit_checkpoint_interval: NonZeroU64,
47    ) -> Result<Self> {
48        Ok(Self {
49            writer,
50            sink_coordinate_client: writer_param
51                .meta_client
52                .as_ref()
53                .ok_or_else(|| anyhow!("should have meta client"))?
54                .clone()
55                .sink_coordinate_client()
56                .await,
57            param,
58            vnode_bitmap: writer_param
59                .vnode_bitmap
60                .as_ref()
61                .ok_or_else(|| {
62                    anyhow!("sink needs coordination and should not have singleton input")
63                })?
64                .clone(),
65            commit_checkpoint_interval,
66            sink_writer_metrics: SinkWriterMetrics::new(writer_param),
67        })
68    }
69}
70
71#[async_trait]
72impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for CoordinatedLogSinker<W> {
73    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
74        let (mut coordinator_stream_handle, log_store_rewind_start_epoch) = self
75            .sink_coordinate_client
76            .new_stream_handle(&self.param, self.vnode_bitmap)
77            .await?;
78        let mut sink_writer = self.writer;
79        log_reader.start_from(log_store_rewind_start_epoch).await?;
80        #[derive(Debug)]
81        enum LogConsumerState {
82            /// Mark that the log consumer is not initialized yet
83            Uninitialized,
84
85            /// Mark that a new epoch has begun.
86            EpochBegun { curr_epoch: u64 },
87
88            /// Mark that the consumer has just received a barrier
89            BarrierReceived { prev_epoch: u64 },
90        }
91
92        let mut state = LogConsumerState::Uninitialized;
93
94        let mut current_checkpoint: u64 = 0;
95        let commit_checkpoint_interval = self.commit_checkpoint_interval;
96        let sink_writer_metrics = self.sink_writer_metrics;
97
98        loop {
99            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
100            // begin_epoch when not previously began
101            state = match state {
102                LogConsumerState::Uninitialized => {
103                    sink_writer.begin_epoch(epoch).await?;
104                    LogConsumerState::EpochBegun { curr_epoch: epoch }
105                }
106                LogConsumerState::EpochBegun { curr_epoch } => {
107                    assert!(
108                        epoch >= curr_epoch,
109                        "new epoch {} should not be below the current epoch {}",
110                        epoch,
111                        curr_epoch
112                    );
113                    LogConsumerState::EpochBegun { curr_epoch: epoch }
114                }
115                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
116                    assert!(
117                        epoch > prev_epoch,
118                        "new epoch {} should be greater than prev epoch {}",
119                        epoch,
120                        prev_epoch
121                    );
122
123                    sink_writer.begin_epoch(epoch).await?;
124                    LogConsumerState::EpochBegun { curr_epoch: epoch }
125                }
126            };
127            match item {
128                LogStoreReadItem::StreamChunk { chunk, .. } => {
129                    if let Err(e) = sink_writer.write_batch(chunk).await {
130                        sink_writer.abort().await?;
131                        return Err(e);
132                    }
133                }
134                LogStoreReadItem::Barrier {
135                    is_checkpoint,
136                    new_vnode_bitmap,
137                    is_stop,
138                } => {
139                    let prev_epoch = match state {
140                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
141                        _ => unreachable!("epoch must have begun before handling barrier"),
142                    };
143                    if is_checkpoint {
144                        current_checkpoint += 1;
145                        if current_checkpoint >= commit_checkpoint_interval.get()
146                            || new_vnode_bitmap.is_some()
147                            || is_stop
148                        {
149                            let start_time = Instant::now();
150                            let metadata = sink_writer.barrier(true).await?;
151                            let metadata = metadata.ok_or_else(|| {
152                                SinkError::Coordinator(anyhow!(
153                                    "should get metadata on checkpoint barrier"
154                                ))
155                            })?;
156                            coordinator_stream_handle.commit(epoch, metadata).await?;
157                            sink_writer_metrics
158                                .sink_commit_duration
159                                .observe(start_time.elapsed().as_millis() as f64);
160                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
161
162                            current_checkpoint = 0;
163                            if let Some(new_vnode_bitmap) = new_vnode_bitmap {
164                                coordinator_stream_handle
165                                    .update_vnode_bitmap(&new_vnode_bitmap)
166                                    .await?;
167                            }
168                            if is_stop {
169                                coordinator_stream_handle.stop().await?;
170                                info!(
171                                    sink_id = self.param.sink_id.sink_id,
172                                    "coordinated log sinker stops"
173                                );
174                                return pending().await;
175                            }
176                        } else {
177                            let metadata = sink_writer.barrier(false).await?;
178                            if let Some(metadata) = metadata {
179                                warn!(?metadata, "get metadata on non-checkpoint barrier");
180                            }
181                        }
182                    } else {
183                        let metadata = sink_writer.barrier(false).await?;
184                        if let Some(metadata) = metadata {
185                            warn!(?metadata, "get metadata on non-checkpoint barrier");
186                        }
187                    }
188                    state = LogConsumerState::BarrierReceived { prev_epoch }
189                }
190            }
191        }
192    }
193}