risingwave_connector/sink/
decouple_checkpoint_log_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU64;
16use std::time::Instant;
17
18use async_trait::async_trait;
19
20use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
21use crate::sink::writer::SinkWriter;
22use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics};
23
24pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
25pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
26pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";
27
28pub fn default_commit_checkpoint_interval() -> u64 {
29    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
30}
31
32/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
33/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,
34/// we delay the checkpoint barrier to make commits less frequent.
35pub struct DecoupleCheckpointLogSinkerOf<W> {
36    writer: W,
37    sink_writer_metrics: SinkWriterMetrics,
38    commit_checkpoint_interval: NonZeroU64,
39}
40
41impl<W> DecoupleCheckpointLogSinkerOf<W> {
42    /// Create a log sinker with a commit checkpoint interval. The sinker should be used with a
43    /// decouple log reader `KvLogStoreReader`.
44    pub fn new(
45        writer: W,
46        sink_writer_metrics: SinkWriterMetrics,
47        commit_checkpoint_interval: NonZeroU64,
48    ) -> Self {
49        DecoupleCheckpointLogSinkerOf {
50            writer,
51            sink_writer_metrics,
52            commit_checkpoint_interval,
53        }
54    }
55}
56
57#[async_trait]
58impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
59    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
60        let mut sink_writer = self.writer;
61        log_reader.start_from(None).await?;
62        #[derive(Debug)]
63        enum LogConsumerState {
64            /// Mark that the log consumer is not initialized yet
65            Uninitialized,
66
67            /// Mark that a new epoch has begun.
68            EpochBegun { curr_epoch: u64 },
69
70            /// Mark that the consumer has just received a barrier
71            BarrierReceived { prev_epoch: u64 },
72        }
73
74        let mut state = LogConsumerState::Uninitialized;
75
76        let mut current_checkpoint: u64 = 0;
77        let commit_checkpoint_interval = self.commit_checkpoint_interval;
78        let sink_writer_metrics = self.sink_writer_metrics;
79
80        loop {
81            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
82            // begin_epoch when not previously began
83            state = match state {
84                LogConsumerState::Uninitialized => {
85                    sink_writer.begin_epoch(epoch).await?;
86                    LogConsumerState::EpochBegun { curr_epoch: epoch }
87                }
88                LogConsumerState::EpochBegun { curr_epoch } => {
89                    assert!(
90                        epoch >= curr_epoch,
91                        "new epoch {} should not be below the current epoch {}",
92                        epoch,
93                        curr_epoch
94                    );
95                    LogConsumerState::EpochBegun { curr_epoch: epoch }
96                }
97                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
98                    assert!(
99                        epoch > prev_epoch,
100                        "new epoch {} should be greater than prev epoch {}",
101                        epoch,
102                        prev_epoch
103                    );
104
105                    sink_writer.begin_epoch(epoch).await?;
106                    LogConsumerState::EpochBegun { curr_epoch: epoch }
107                }
108            };
109            match item {
110                LogStoreReadItem::StreamChunk { chunk, .. } => {
111                    if let Err(e) = sink_writer.write_batch(chunk).await {
112                        sink_writer.abort().await?;
113                        return Err(e);
114                    }
115                }
116                LogStoreReadItem::Barrier {
117                    is_checkpoint,
118                    new_vnode_bitmap,
119                    ..
120                } => {
121                    let prev_epoch = match state {
122                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
123                        _ => unreachable!("epoch must have begun before handling barrier"),
124                    };
125                    if is_checkpoint {
126                        current_checkpoint += 1;
127                        if current_checkpoint >= commit_checkpoint_interval.get()
128                            || new_vnode_bitmap.is_some()
129                        {
130                            let start_time = Instant::now();
131                            sink_writer.barrier(true).await?;
132                            sink_writer_metrics
133                                .sink_commit_duration
134                                .observe(start_time.elapsed().as_millis() as f64);
135                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
136
137                            current_checkpoint = 0;
138                        } else {
139                            sink_writer.barrier(false).await?;
140                        }
141                    } else {
142                        assert!(new_vnode_bitmap.is_none());
143                        sink_writer.barrier(false).await?;
144                    }
145                    state = LogConsumerState::BarrierReceived { prev_epoch }
146                }
147            }
148        }
149    }
150}