risingwave_connector/sink/file_sink/
batching_log_sink.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17use crate::sink::file_sink::opendal_sink::OpenDalSinkWriter;
18use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
19use crate::sink::{LogSinker, Result, SinkLogReader};
20
21/// `BatchingLogSinker` is used for a commit-decoupled sink that supports cross-barrier batching.
22/// Currently, it is only used for file sinks, so it contains an `OpenDalSinkWriter`.
23pub struct BatchingLogSinker {
24    writer: OpenDalSinkWriter,
25}
26
27impl BatchingLogSinker {
28    /// Create a log sinker with a file sink writer.
29    pub fn new(writer: OpenDalSinkWriter) -> Self {
30        BatchingLogSinker { writer }
31    }
32}
33
34#[async_trait]
35impl LogSinker for BatchingLogSinker {
36    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
37        log_reader.start_from(None).await?;
38        let mut sink_writer = self.writer;
39        #[derive(Debug)]
40        enum LogConsumerState {
41            /// Mark that the log consumer is not initialized yet
42            Uninitialized,
43
44            /// Mark that a new epoch has begun.
45            EpochBegun { curr_epoch: u64 },
46
47            /// Mark that the consumer has just received a barrier
48            BarrierReceived { prev_epoch: u64 },
49        }
50
51        let mut state = LogConsumerState::Uninitialized;
52        loop {
53            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
54            // begin_epoch when not previously began
55            state = match state {
56                LogConsumerState::Uninitialized => {
57                    LogConsumerState::EpochBegun { curr_epoch: epoch }
58                }
59                LogConsumerState::EpochBegun { curr_epoch } => {
60                    assert!(
61                        epoch >= curr_epoch,
62                        "new epoch {} should not be below the current epoch {}",
63                        epoch,
64                        curr_epoch
65                    );
66                    LogConsumerState::EpochBegun { curr_epoch: epoch }
67                }
68                LogConsumerState::BarrierReceived { prev_epoch } => {
69                    assert!(
70                        epoch > prev_epoch,
71                        "new epoch {} should be greater than prev epoch {}",
72                        epoch,
73                        prev_epoch
74                    );
75                    LogConsumerState::EpochBegun { curr_epoch: epoch }
76                }
77            };
78            match item {
79                LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
80                    sink_writer.write_batch(chunk).await?;
81                    if sink_writer.try_commit().await? {
82                        // The file has been successfully written and is now visible to downstream consumers.
83                        // Truncate up to this chunk, which also covers any preceding barriers.
84                        log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
85                    }
86                }
87                LogStoreReadItem::Barrier { .. } => {
88                    let prev_epoch = match state {
89                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
90                        _ => unreachable!("epoch must have begun before handling barrier"),
91                    };
92
93                    // Truncate the barrier if either:
94                    // 1. try_commit succeeded (file was written and is now visible), or
95                    // 2. there is no pending data (no active writer), so the barrier can be safely discarded.
96                    // This avoids accumulating barriers in the log store during idle periods with no data.
97                    if sink_writer.try_commit().await? || !sink_writer.has_pending_data() {
98                        log_reader.truncate(TruncateOffset::Barrier { epoch: prev_epoch })?;
99                    }
100
101                    state = LogConsumerState::BarrierReceived { prev_epoch }
102                }
103            }
104        }
105    }
106}