risingwave_connector/sink/
writer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::{Future, Ready};
16use std::pin::pin;
17use std::time::Instant;
18
19use async_trait::async_trait;
20use futures::TryFuture;
21use futures::future::{Either, select};
22use risingwave_common::array::StreamChunk;
23use rw_futures_util::drop_either_future;
24
25use crate::sink::encoder::SerTo;
26use crate::sink::formatter::SinkFormatter;
27use crate::sink::log_store::{
28    DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset,
29};
30use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkWriterMetrics};
31
32#[async_trait]
33pub trait SinkWriter: Send + 'static {
34    type CommitMetadata: Send = ();
35    /// Begin a new epoch
36    async fn begin_epoch(&mut self, epoch: u64) -> Result<()>;
37
38    /// Write a stream chunk to sink
39    async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>;
40
41    /// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
42    /// writer should commit the current epoch.
43    async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata>;
44
45    /// Clean up
46    async fn abort(&mut self) -> Result<()> {
47        Ok(())
48    }
49}
50
51pub type DummyDeliveryFuture = Ready<std::result::Result<(), SinkError>>;
52
53pub trait AsyncTruncateSinkWriter: Send + 'static {
54    type DeliveryFuture: TryFuture<Ok = (), Error = SinkError> + Unpin + Send + 'static =
55        DummyDeliveryFuture;
56
57    fn write_chunk<'a>(
58        &'a mut self,
59        chunk: StreamChunk,
60        add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
61    ) -> impl Future<Output = Result<()>> + Send + 'a;
62
63    fn barrier(&mut self, _is_checkpoint: bool) -> impl Future<Output = Result<()>> + Send + '_ {
64        async { Ok(()) }
65    }
66}
67
68/// A free-form sink that may output in multiple formats and encodings. Examples include kafka,
69/// kinesis, nats and redis.
70///
71/// The implementor specifies required key & value type (likely string or bytes), as well as how to
72/// write a single pair. The provided `write_chunk` method would handle the interaction with a
73/// `SinkFormatter`.
74///
75/// Currently kafka takes `&mut self` while kinesis takes `&self`. So we use `&mut self` in trait
76/// but implement it for `&Kinesis`. This allows us to hold `&mut &Kinesis` and `&Kinesis`
77/// simultaneously, preventing the schema clone issue propagating from kafka to kinesis.
78pub trait FormattedSink {
79    type K;
80    type V;
81    async fn write_one(&mut self, k: Option<Self::K>, v: Option<Self::V>) -> Result<()>;
82
83    async fn write_chunk<F: SinkFormatter>(
84        &mut self,
85        chunk: StreamChunk,
86        formatter: &F,
87    ) -> Result<()>
88    where
89        F::K: SerTo<Self::K>,
90        F::V: SerTo<Self::V>,
91    {
92        for r in formatter.format_chunk(&chunk) {
93            let (event_key_object, event_object) = r?;
94
95            self.write_one(
96                event_key_object.map(SerTo::ser_to).transpose()?,
97                event_object.map(SerTo::ser_to).transpose()?,
98            )
99            .await?;
100        }
101
102        Ok(())
103    }
104}
105
106pub struct LogSinkerOf<W> {
107    writer: W,
108    sink_writer_metrics: SinkWriterMetrics,
109}
110
111impl<W> LogSinkerOf<W> {
112    pub fn new(writer: W, sink_writer_metrics: SinkWriterMetrics) -> Self {
113        LogSinkerOf {
114            writer,
115            sink_writer_metrics,
116        }
117    }
118}
119
120#[async_trait]
121impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for LogSinkerOf<W> {
122    async fn consume_log_and_sink(self, mut log_reader: impl SinkLogReader) -> Result<!> {
123        log_reader.start_from(None).await?;
124        let mut sink_writer = self.writer;
125        let metrics = self.sink_writer_metrics;
126        #[derive(Debug)]
127        enum LogConsumerState {
128            /// Mark that the log consumer is not initialized yet
129            Uninitialized,
130
131            /// Mark that a new epoch has begun.
132            EpochBegun { curr_epoch: u64 },
133
134            /// Mark that the consumer has just received a barrier
135            BarrierReceived { prev_epoch: u64 },
136        }
137
138        let mut state = LogConsumerState::Uninitialized;
139
140        loop {
141            let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
142            // begin_epoch when not previously began
143            state = match state {
144                LogConsumerState::Uninitialized => {
145                    sink_writer.begin_epoch(epoch).await?;
146                    LogConsumerState::EpochBegun { curr_epoch: epoch }
147                }
148                LogConsumerState::EpochBegun { curr_epoch } => {
149                    assert!(
150                        epoch >= curr_epoch,
151                        "new epoch {} should not be below the current epoch {}",
152                        epoch,
153                        curr_epoch
154                    );
155                    LogConsumerState::EpochBegun { curr_epoch: epoch }
156                }
157                LogConsumerState::BarrierReceived { prev_epoch } => {
158                    assert!(
159                        epoch > prev_epoch,
160                        "new epoch {} should be greater than prev epoch {}",
161                        epoch,
162                        prev_epoch
163                    );
164                    sink_writer.begin_epoch(epoch).await?;
165                    LogConsumerState::EpochBegun { curr_epoch: epoch }
166                }
167            };
168            match item {
169                LogStoreReadItem::StreamChunk { chunk, .. } => {
170                    if let Err(e) = sink_writer.write_batch(chunk).await {
171                        sink_writer.abort().await?;
172                        return Err(e);
173                    }
174                }
175                LogStoreReadItem::Barrier {
176                    is_checkpoint,
177                    new_vnode_bitmap,
178                    ..
179                } => {
180                    let prev_epoch = match state {
181                        LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
182                        _ => unreachable!("epoch must have begun before handling barrier"),
183                    };
184                    if is_checkpoint {
185                        let start_time = Instant::now();
186                        sink_writer.barrier(true).await?;
187                        metrics
188                            .sink_commit_duration
189                            .observe(start_time.elapsed().as_millis() as f64);
190                        log_reader.truncate(TruncateOffset::Barrier { epoch })?;
191                    } else {
192                        assert!(new_vnode_bitmap.is_none());
193                        sink_writer.barrier(false).await?;
194                    }
195                    state = LogConsumerState::BarrierReceived { prev_epoch }
196                }
197            }
198        }
199    }
200}
201
202#[easy_ext::ext(SinkWriterExt)]
203impl<T> T
204where
205    T: SinkWriter<CommitMetadata = ()> + Sized,
206{
207    pub fn into_log_sinker(self, sink_writer_metrics: SinkWriterMetrics) -> LogSinkerOf<Self> {
208        LogSinkerOf {
209            writer: self,
210            sink_writer_metrics,
211        }
212    }
213}
214
215pub struct AsyncTruncateLogSinkerOf<W: AsyncTruncateSinkWriter> {
216    writer: W,
217    future_manager: DeliveryFutureManager<W::DeliveryFuture>,
218}
219
220impl<W: AsyncTruncateSinkWriter> AsyncTruncateLogSinkerOf<W> {
221    pub fn new(writer: W, max_future_count: usize) -> Self {
222        AsyncTruncateLogSinkerOf {
223            writer,
224            future_manager: DeliveryFutureManager::new(max_future_count),
225        }
226    }
227}
228
229#[async_trait]
230impl<W: AsyncTruncateSinkWriter> LogSinker for AsyncTruncateLogSinkerOf<W> {
231    async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
232        log_reader.start_from(None).await?;
233        loop {
234            let select_result = drop_either_future(
235                select(
236                    pin!(log_reader.next_item()),
237                    pin!(self.future_manager.next_truncate_offset()),
238                )
239                .await,
240            );
241            match select_result {
242                Either::Left(item_result) => {
243                    let (epoch, item) = item_result?;
244                    match item {
245                        LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
246                            let add_future = self.future_manager.start_write_chunk(epoch, chunk_id);
247                            self.writer.write_chunk(chunk, add_future).await?;
248                        }
249                        LogStoreReadItem::Barrier { is_checkpoint, .. } => {
250                            self.writer.barrier(is_checkpoint).await?;
251                            self.future_manager.add_barrier(epoch);
252                        }
253                    }
254                }
255                Either::Right(offset_result) => {
256                    let offset = offset_result?;
257                    log_reader.truncate(offset)?;
258                }
259            }
260        }
261    }
262}
263
264#[easy_ext::ext(AsyncTruncateSinkWriterExt)]
265impl<T> T
266where
267    T: AsyncTruncateSinkWriter + Sized,
268{
269    pub fn into_log_sinker(self, max_future_count: usize) -> AsyncTruncateLogSinkerOf<Self> {
270        AsyncTruncateLogSinkerOf::new(self, max_future_count)
271    }
272}