risingwave_connector/sink/file_sink/batching_log_sink.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17use crate::sink::file_sink::opendal_sink::OpenDalSinkWriter;
18use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
19use crate::sink::{LogSinker, Result, SinkLogReader};
20
21/// `BatchingLogSinker` is used for a commit-decoupled sink that supports cross-barrier batching.
22/// Currently, it is only used for file sinks, so it contains an `OpenDalSinkWriter`.
23pub struct BatchingLogSinker {
24 writer: OpenDalSinkWriter,
25}
26
27impl BatchingLogSinker {
28 /// Create a log sinker with a file sink writer.
29 pub fn new(writer: OpenDalSinkWriter) -> Self {
30 BatchingLogSinker { writer }
31 }
32}
33
34#[async_trait]
35impl LogSinker for BatchingLogSinker {
36 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
37 log_reader.start_from(None).await?;
38 let mut sink_writer = self.writer;
39 #[derive(Debug)]
40 enum LogConsumerState {
41 /// Mark that the log consumer is not initialized yet
42 Uninitialized,
43
44 /// Mark that a new epoch has begun, and store the max_uncommitted_epoch for cross-barrier batching.
45 /// For example, suppose the current order is (chunk1, barrier1, chunk2, barrier2, chunk3), and the batching is not completed until chunk3,
46 /// that is, barrier2 and its previous chunks are not truncated, the `max_uncommitted_epoch` is barrier2.
47 /// When we truncate chunk3, we should first truncate barrier2, and then truncate chunk3.
48 EpochBegun {
49 curr_epoch: u64,
50 max_uncommitted_epoch: Option<u64>,
51 },
52
53 /// Mark that the consumer has just received a barrier
54 BarrierReceived { prev_epoch: u64 },
55 }
56
57 let mut state = LogConsumerState::Uninitialized;
58 loop {
59 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
60 // begin_epoch when not previously began
61 state = match state {
62 LogConsumerState::Uninitialized => LogConsumerState::EpochBegun {
63 curr_epoch: epoch,
64 max_uncommitted_epoch: None,
65 },
66 LogConsumerState::EpochBegun {
67 curr_epoch,
68 max_uncommitted_epoch,
69 } => {
70 assert!(
71 epoch >= curr_epoch,
72 "new epoch {} should not be below the current epoch {}",
73 epoch,
74 curr_epoch
75 );
76 LogConsumerState::EpochBegun {
77 curr_epoch: epoch,
78 max_uncommitted_epoch,
79 }
80 }
81 LogConsumerState::BarrierReceived { prev_epoch } => {
82 assert!(
83 epoch > prev_epoch,
84 "new epoch {} should be greater than prev epoch {}",
85 epoch,
86 prev_epoch
87 );
88 LogConsumerState::EpochBegun {
89 curr_epoch: epoch,
90 max_uncommitted_epoch: Some(prev_epoch),
91 }
92 }
93 };
94 match item {
95 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
96 sink_writer.write_batch(chunk).await?;
97 match sink_writer.try_commit().await {
98 Err(e) => {
99 return Err(e);
100 }
101 // The file has been successfully written and is now visible to downstream consumers.
102 // Truncate the file to remove the specified `chunk_id` and any preceding content.
103 Ok(true) => {
104 // If epoch increased, we first need to truncate the previous epoch.
105 if let Some(max_uncommitted_epoch) = match state {
106 LogConsumerState::EpochBegun {
107 curr_epoch: _,
108 max_uncommitted_epoch,
109 } => max_uncommitted_epoch,
110 _ => unreachable!("epoch must have begun before handling barrier"),
111 } {
112 assert!(epoch > max_uncommitted_epoch);
113 log_reader.truncate(TruncateOffset::Barrier {
114 epoch: max_uncommitted_epoch,
115 })?;
116 state = LogConsumerState::EpochBegun {
117 curr_epoch: epoch,
118 max_uncommitted_epoch: None,
119 }
120 };
121
122 log_reader.truncate(TruncateOffset::Chunk {
123 epoch: (epoch),
124 chunk_id: (chunk_id),
125 })?;
126 }
127 // The file has not been written into downstream file system.
128 Ok(false) => {}
129 }
130 }
131 LogStoreReadItem::Barrier { .. } => {
132 let prev_epoch = match state {
133 LogConsumerState::EpochBegun {
134 curr_epoch,
135 max_uncommitted_epoch: _,
136 } => curr_epoch,
137 _ => unreachable!("epoch must have begun before handling barrier"),
138 };
139
140 // When the barrier arrives, call the writer's try_finish interface to check if the file write can be completed.
141 // If it is completed, which means the file is visible in the downstream file system, then truncate the file in the log store; otherwise, do nothing.
142 // Since the current data must be before the current epoch, we only need to truncate `prev_epoch`.
143 if sink_writer.try_commit().await? {
144 log_reader.truncate(TruncateOffset::Barrier { epoch: prev_epoch })?;
145 };
146
147 state = LogConsumerState::BarrierReceived { prev_epoch }
148 }
149 }
150 }
151 }
152}