risingwave_connector/sink/file_sink/
batching_log_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17use crate::sink::file_sink::opendal_sink::OpenDalSinkWriter;
18use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
19use crate::sink::{LogSinker, Result, SinkLogReader};
20
21/// `BatchingLogSinker` is used for a commit-decoupled sink that supports cross-barrier batching.
22/// Currently, it is only used for file sinks, so it contains an `OpenDalSinkWriter`.
23pub struct BatchingLogSinker {
24    writer: OpenDalSinkWriter,
25}
26
27impl BatchingLogSinker {
28    /// Create a log sinker with a file sink writer.
29    pub fn new(writer: OpenDalSinkWriter) -> Self {
30        BatchingLogSinker { writer }
31    }
32}
33
34#[async_trait]
35impl LogSinker for BatchingLogSinker {
36    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
37        log_reader.start_from(None).await?;
38        let mut sink_writer = self.writer;
39        #[derive(Debug)]
40        enum LogConsumerState {
41            /// Mark that the log consumer is not initialized yet
42            Uninitialized,
43
44            /// Mark that a new epoch has begun, and store the max_uncommitted_epoch for cross-barrier batching.
45            /// For example, suppose the current order is (chunk1, barrier1, chunk2, barrier2, chunk3), and the batching is not completed until chunk3,
46            /// that is, barrier2 and its previous chunks are not truncated, the `max_uncommitted_epoch` is barrier2.
47            /// When we truncate chunk3, we should first truncate barrier2, and then truncate chunk3.
48            EpochBegun {
49                curr_epoch: u64,
50                max_uncommitted_epoch: Option<u64>,
51            },
52
53            /// Mark that the consumer has just received a barrier
54            BarrierReceived { prev_epoch: u64 },
55        }
56
57        let mut state = LogConsumerState::Uninitialized;
58        loop {
59            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
60            // begin_epoch when not previously began
61            state = match state {
62                LogConsumerState::Uninitialized => LogConsumerState::EpochBegun {
63                    curr_epoch: epoch,
64                    max_uncommitted_epoch: None,
65                },
66                LogConsumerState::EpochBegun {
67                    curr_epoch,
68                    max_uncommitted_epoch,
69                } => {
70                    assert!(
71                        epoch >= curr_epoch,
72                        "new epoch {} should not be below the current epoch {}",
73                        epoch,
74                        curr_epoch
75                    );
76                    LogConsumerState::EpochBegun {
77                        curr_epoch: epoch,
78                        max_uncommitted_epoch,
79                    }
80                }
81                LogConsumerState::BarrierReceived { prev_epoch } => {
82                    assert!(
83                        epoch > prev_epoch,
84                        "new epoch {} should be greater than prev epoch {}",
85                        epoch,
86                        prev_epoch
87                    );
88                    LogConsumerState::EpochBegun {
89                        curr_epoch: epoch,
90                        max_uncommitted_epoch: Some(prev_epoch),
91                    }
92                }
93            };
94            match item {
95                LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
96                    sink_writer.write_batch(chunk).await?;
97                    match sink_writer.try_commit().await {
98                        Err(e) => {
99                            return Err(e);
100                        }
101                        // The file has been successfully written and is now visible to downstream consumers.
102                        // Truncate the file to remove the specified `chunk_id` and any preceding content.
103                        Ok(true) => {
104                            // If epoch increased, we first need to truncate the previous epoch.
105                            if let Some(max_uncommitted_epoch) = match state {
106                                LogConsumerState::EpochBegun {
107                                    curr_epoch: _,
108                                    max_uncommitted_epoch,
109                                } => max_uncommitted_epoch,
110                                _ => unreachable!("epoch must have begun before handling barrier"),
111                            } {
112                                assert!(epoch > max_uncommitted_epoch);
113                                log_reader.truncate(TruncateOffset::Barrier {
114                                    epoch: max_uncommitted_epoch,
115                                })?;
116                                state = LogConsumerState::EpochBegun {
117                                    curr_epoch: epoch,
118                                    max_uncommitted_epoch: None,
119                                }
120                            };
121
122                            log_reader.truncate(TruncateOffset::Chunk {
123                                epoch: (epoch),
124                                chunk_id: (chunk_id),
125                            })?;
126                        }
127                        // The file has not been written into downstream file system.
128                        Ok(false) => {}
129                    }
130                }
131                LogStoreReadItem::Barrier { .. } => {
132                    let prev_epoch = match state {
133                        LogConsumerState::EpochBegun {
134                            curr_epoch,
135                            max_uncommitted_epoch: _,
136                        } => curr_epoch,
137                        _ => unreachable!("epoch must have begun before handling barrier"),
138                    };
139
140                    // When the barrier arrives, call the writer's try_finish interface to check if the file write can be completed.
141                    // If it is completed, which means the file is visible in the downstream file system, then truncate the file in the log store; otherwise, do nothing.
142                    // Since the current data must be before the current epoch, we only need to truncate `prev_epoch`.
143                    if sink_writer.try_commit().await? {
144                        log_reader.truncate(TruncateOffset::Barrier { epoch: prev_epoch })?;
145                    };
146
147                    state = LogConsumerState::BarrierReceived { prev_epoch }
148                }
149            }
150        }
151    }
152}