risingwave_connector/sink/
decouple_checkpoint_log_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU64;
16use std::time::Instant;
17
18use async_trait::async_trait;
19
20use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
21use crate::sink::writer::SinkWriter;
22use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics};
23
24pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
25pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
26
27pub const ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 60;
28pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";
29
30pub fn default_commit_checkpoint_interval() -> u64 {
31    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE
32}
33
34pub fn iceberg_default_commit_checkpoint_interval() -> u64 {
35    ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL
36}
37
38/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
39/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,
40/// we delay the checkpoint barrier to make commits less frequent.
41pub struct DecoupleCheckpointLogSinkerOf<W> {
42    writer: W,
43    sink_writer_metrics: SinkWriterMetrics,
44    commit_checkpoint_interval: NonZeroU64,
45}
46
47impl<W> DecoupleCheckpointLogSinkerOf<W> {
48    /// Create a log sinker with a commit checkpoint interval. The sinker should be used with a
49    /// decouple log reader `KvLogStoreReader`.
50    pub fn new(
51        writer: W,
52        sink_writer_metrics: SinkWriterMetrics,
53        commit_checkpoint_interval: NonZeroU64,
54    ) -> Self {
55        DecoupleCheckpointLogSinkerOf {
56            writer,
57            sink_writer_metrics,
58            commit_checkpoint_interval,
59        }
60    }
61}
62
63#[async_trait]
64impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
65    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
66        let mut sink_writer = self.writer;
67        log_reader.start_from(None).await?;
68        #[derive(Debug)]
69        enum LogConsumerState {
70            /// Mark that the log consumer is not initialized yet
71            Uninitialized,
72
73            /// Mark that a new epoch has begun.
74            EpochBegun { curr_epoch: u64 },
75
76            /// Mark that the consumer has just received a barrier
77            BarrierReceived { prev_epoch: u64 },
78        }
79
80        let mut state = LogConsumerState::Uninitialized;
81
82        let mut current_checkpoint: u64 = 0;
83        let commit_checkpoint_interval = self.commit_checkpoint_interval;
84        let sink_writer_metrics = self.sink_writer_metrics;
85
86        loop {
87            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
88            // begin_epoch when not previously began
89            state = match state {
90                LogConsumerState::Uninitialized => {
91                    sink_writer.begin_epoch(epoch).await?;
92                    LogConsumerState::EpochBegun { curr_epoch: epoch }
93                }
94                LogConsumerState::EpochBegun { curr_epoch } => {
95                    assert!(
96                        epoch >= curr_epoch,
97                        "new epoch {} should not be below the current epoch {}",
98                        epoch,
99                        curr_epoch
100                    );
101                    LogConsumerState::EpochBegun { curr_epoch: epoch }
102                }
103                LogConsumerState::BarrierReceived { prev_epoch, .. } => {
104                    assert!(
105                        epoch > prev_epoch,
106                        "new epoch {} should be greater than prev epoch {}",
107                        epoch,
108                        prev_epoch
109                    );
110
111                    sink_writer.begin_epoch(epoch).await?;
112                    LogConsumerState::EpochBegun { curr_epoch: epoch }
113                }
114            };
115            match item {
116                LogStoreReadItem::StreamChunk { chunk, .. } => {
117                    if let Err(e) = sink_writer.write_batch(chunk).await {
118                        sink_writer.abort().await?;
119                        return Err(e);
120                    }
121                }
122                LogStoreReadItem::Barrier {
123                    is_checkpoint,
124                    new_vnode_bitmap,
125                    ..
126                } => {
127                    let prev_epoch = match state {
128                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
129                        _ => unreachable!("epoch must have begun before handling barrier"),
130                    };
131                    if is_checkpoint {
132                        current_checkpoint += 1;
133                        if current_checkpoint >= commit_checkpoint_interval.get()
134                            || new_vnode_bitmap.is_some()
135                        {
136                            let start_time = Instant::now();
137                            sink_writer.barrier(true).await?;
138                            sink_writer_metrics
139                                .sink_commit_duration
140                                .observe(start_time.elapsed().as_millis() as f64);
141                            log_reader.truncate(TruncateOffset::Barrier { epoch })?;
142
143                            current_checkpoint = 0;
144                        } else {
145                            sink_writer.barrier(false).await?;
146                        }
147                    } else {
148                        assert!(new_vnode_bitmap.is_none());
149                        sink_writer.barrier(false).await?;
150                    }
151                    state = LogConsumerState::BarrierReceived { prev_epoch }
152                }
153            }
154        }
155    }
156}