risingwave_connector/sink/file_sink/batching_log_sink.rs
1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17use crate::sink::file_sink::opendal_sink::OpenDalSinkWriter;
18use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
19use crate::sink::{LogSinker, Result, SinkLogReader};
20
21/// `BatchingLogSinker` is used for a commit-decoupled sink that supports cross-barrier batching.
22/// Currently, it is only used for file sinks, so it contains an `OpenDalSinkWriter`.
23pub struct BatchingLogSinker {
24 writer: OpenDalSinkWriter,
25}
26
27impl BatchingLogSinker {
28 /// Create a log sinker with a file sink writer.
29 pub fn new(writer: OpenDalSinkWriter) -> Self {
30 BatchingLogSinker { writer }
31 }
32}
33
34#[async_trait]
35impl LogSinker for BatchingLogSinker {
36 async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
37 log_reader.start_from(None).await?;
38 let mut sink_writer = self.writer;
39 #[derive(Debug)]
40 enum LogConsumerState {
41 /// Mark that the log consumer is not initialized yet
42 Uninitialized,
43
44 /// Mark that a new epoch has begun.
45 EpochBegun { curr_epoch: u64 },
46
47 /// Mark that the consumer has just received a barrier
48 BarrierReceived { prev_epoch: u64 },
49 }
50
51 let mut state = LogConsumerState::Uninitialized;
52 loop {
53 let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
54 // begin_epoch when not previously began
55 state = match state {
56 LogConsumerState::Uninitialized => {
57 LogConsumerState::EpochBegun { curr_epoch: epoch }
58 }
59 LogConsumerState::EpochBegun { curr_epoch } => {
60 assert!(
61 epoch >= curr_epoch,
62 "new epoch {} should not be below the current epoch {}",
63 epoch,
64 curr_epoch
65 );
66 LogConsumerState::EpochBegun { curr_epoch: epoch }
67 }
68 LogConsumerState::BarrierReceived { prev_epoch } => {
69 assert!(
70 epoch > prev_epoch,
71 "new epoch {} should be greater than prev epoch {}",
72 epoch,
73 prev_epoch
74 );
75 LogConsumerState::EpochBegun { curr_epoch: epoch }
76 }
77 };
78 match item {
79 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
80 sink_writer.write_batch(chunk).await?;
81 if sink_writer.try_commit().await? {
82 // The file has been successfully written and is now visible to downstream consumers.
83 // Truncate up to this chunk, which also covers any preceding barriers.
84 log_reader.truncate(TruncateOffset::Chunk { epoch, chunk_id })?;
85 }
86 }
87 LogStoreReadItem::Barrier { .. } => {
88 let prev_epoch = match state {
89 LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
90 _ => unreachable!("epoch must have begun before handling barrier"),
91 };
92
93 // Truncate the barrier if either:
94 // 1. try_commit succeeded (file was written and is now visible), or
95 // 2. there is no pending data (no active writer), so the barrier can be safely discarded.
96 // This avoids accumulating barriers in the log store during idle periods with no data.
97 if sink_writer.try_commit().await? || !sink_writer.has_pending_data() {
98 log_reader.truncate(TruncateOffset::Barrier { epoch: prev_epoch })?;
99 }
100
101 state = LogConsumerState::BarrierReceived { prev_epoch }
102 }
103 }
104 }
105 }
106}