risingwave_stream/common/log_store_impl/
in_mem.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use await_tree::InstrumentAwait;
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use risingwave_common::array::StreamChunk;
20use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
21use risingwave_connector::sink::log_store::{
22    FlushCurrentEpochOptions, LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult,
23    LogWriter, LogWriterPostFlushCurrentEpoch, TruncateOffset,
24};
25use tokio::sync::mpsc::{
26    Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
27};
28use tokio::sync::oneshot;
29
30use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
31use crate::executor::StreamExecutorResult;
32
33enum InMemLogStoreItem {
34    StreamChunk(StreamChunk),
35    Barrier {
36        next_epoch: u64,
37        options: FlushCurrentEpochOptions,
38    },
39}
40
41/// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded
42/// mpsc channel.
43///
44/// Since it is in-memory, when `flush_current_epoch` with checkpoint epoch, it should wait for the
45/// reader to finish consuming all the data in current checkpoint epoch.
46pub struct BoundedInMemLogStoreWriter {
47    /// Current epoch. Should be `Some` after `init`
48    curr_epoch: Option<u64>,
49
50    /// Holder of oneshot channel to send the initial epoch to the associated log reader.
51    init_epoch_tx: Option<oneshot::Sender<u64>>,
52
53    /// Sending log store item to log reader
54    item_tx: Sender<InMemLogStoreItem>,
55
56    /// Receiver for the epoch consumed by log reader.
57    truncated_epoch_rx: UnboundedReceiver<u64>,
58
59    wait_init_epoch: Option<WaitInitEpochFn>,
60}
61
62#[derive(Eq, PartialEq, Debug)]
63enum LogReaderEpochProgress {
64    /// In progress of consuming data in current epoch.
65    Consuming(u64),
66    /// Finished emitting the data in checkpoint epoch, and waiting for a call on `truncate`.
67    AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
68}
69
70const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
71
72pub struct BoundedInMemLogStoreReader {
73    /// Current progress of log reader. Can be either consuming an epoch, or has finished consuming
74    /// an epoch and waiting to be truncated.
75    epoch_progress: LogReaderEpochProgress,
76
77    /// Holder for oneshot channel to receive the initial epoch
78    init_epoch_rx: Option<oneshot::Receiver<u64>>,
79
80    /// Receiver to fetch log store item
81    item_rx: Receiver<InMemLogStoreItem>,
82
83    /// Sender of consumed epoch to the log writer
84    truncated_epoch_tx: UnboundedSender<u64>,
85
86    /// Offset of the latest emitted item
87    latest_offset: TruncateOffset,
88
89    /// Offset of the latest truncated item
90    truncate_offset: TruncateOffset,
91}
92
93type WaitInitEpochFn =
94    Box<dyn FnOnce(EpochPair) -> BoxFuture<'static, StreamExecutorResult<()>> + Send + 'static>;
95
96pub struct BoundedInMemLogStoreFactory {
97    bound: usize,
98    wait_init_epoch: WaitInitEpochFn,
99}
100
101impl BoundedInMemLogStoreFactory {
102    pub fn new(
103        bound: usize,
104        wait_init_epoch: impl FnOnce(EpochPair) -> BoxFuture<'static, StreamExecutorResult<()>>
105        + Send
106        + 'static,
107    ) -> Self {
108        Self {
109            bound,
110            wait_init_epoch: Box::new(wait_init_epoch),
111        }
112    }
113
114    #[cfg(test)]
115    pub fn for_test(bound: usize) -> Self {
116        Self {
117            bound,
118            wait_init_epoch: Box::new(|_x| std::future::ready(Ok(())).boxed()),
119        }
120    }
121}
122
123impl LogStoreFactory for BoundedInMemLogStoreFactory {
124    type Reader = BoundedInMemLogStoreReader;
125    type Writer = BoundedInMemLogStoreWriter;
126
127    const ALLOW_REWIND: bool = false;
128    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool = false;
129
130    async fn build(self) -> (Self::Reader, Self::Writer) {
131        let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
132        let (item_tx, item_rx) = channel(self.bound);
133        let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
134        let reader = BoundedInMemLogStoreReader {
135            epoch_progress: UNINITIALIZED,
136            init_epoch_rx: Some(init_epoch_rx),
137            item_rx,
138            truncated_epoch_tx,
139            latest_offset: TruncateOffset::Barrier { epoch: 0 },
140            truncate_offset: TruncateOffset::Barrier { epoch: 0 },
141        };
142        let writer = BoundedInMemLogStoreWriter {
143            curr_epoch: None,
144            init_epoch_tx: Some(init_epoch_tx),
145            item_tx,
146            truncated_epoch_rx,
147            wait_init_epoch: Some(self.wait_init_epoch),
148        };
149        (reader, writer)
150    }
151}
152
153impl LogReader for BoundedInMemLogStoreReader {
154    async fn init(&mut self) -> LogStoreResult<()> {
155        let init_epoch_rx = self
156            .init_epoch_rx
157            .take()
158            .expect("should not init for twice");
159        let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
160        assert_eq!(self.epoch_progress, UNINITIALIZED);
161        self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
162        self.latest_offset = TruncateOffset::Barrier {
163            epoch: epoch.prev_epoch(),
164        };
165        self.truncate_offset = TruncateOffset::Barrier {
166            epoch: epoch.prev_epoch(),
167        };
168        Ok(())
169    }
170
171    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
172        match self.item_rx.recv().await {
173            Some(item) => match self.epoch_progress {
174                Consuming(current_epoch) => match item {
175                    InMemLogStoreItem::StreamChunk(chunk) => {
176                        let chunk_id = match self.latest_offset {
177                            TruncateOffset::Chunk { epoch, chunk_id } => {
178                                assert_eq!(epoch, current_epoch);
179                                chunk_id + 1
180                            }
181                            TruncateOffset::Barrier { epoch } => {
182                                assert!(
183                                    epoch < current_epoch,
184                                    "prev offset at barrier {} but current epoch {}",
185                                    epoch,
186                                    current_epoch
187                                );
188                                0
189                            }
190                        };
191                        self.latest_offset = TruncateOffset::Chunk {
192                            epoch: current_epoch,
193                            chunk_id,
194                        };
195                        Ok((
196                            current_epoch,
197                            LogStoreReadItem::StreamChunk { chunk, chunk_id },
198                        ))
199                    }
200                    InMemLogStoreItem::Barrier {
201                        next_epoch,
202                        options,
203                    } => {
204                        if options.is_checkpoint {
205                            self.epoch_progress = AwaitingTruncate {
206                                next_epoch,
207                                sealed_epoch: current_epoch,
208                            };
209                        } else {
210                            self.epoch_progress = Consuming(next_epoch);
211                        }
212                        self.latest_offset = TruncateOffset::Barrier {
213                            epoch: current_epoch,
214                        };
215                        Ok((
216                            current_epoch,
217                            LogStoreReadItem::Barrier {
218                                is_checkpoint: options.is_checkpoint,
219                                new_vnode_bitmap: options.new_vnode_bitmap,
220                                is_stop: options.is_stop,
221                                add_columns: options.add_columns,
222                            },
223                        ))
224                    }
225                },
226                AwaitingTruncate { .. } => Err(anyhow!(
227                    "should not call next_item on checkpoint barrier for in-mem log store"
228                )),
229            },
230            None => Err(anyhow!("end of log stream")),
231        }
232    }
233
234    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
235        // check the truncate offset is higher than prev truncate offset
236        if self.truncate_offset >= offset {
237            return Err(anyhow!(
238                "truncate offset {:?} but prev truncate offset is {:?}",
239                offset,
240                self.truncate_offset
241            ));
242        }
243
244        // check the truncate offset does not exceed the latest possible offset
245        if offset > self.latest_offset {
246            return Err(anyhow!(
247                "truncate at {:?} but latest offset is {:?}",
248                offset,
249                self.latest_offset
250            ));
251        }
252
253        if let AwaitingTruncate {
254            sealed_epoch,
255            next_epoch,
256        } = &self.epoch_progress
257            && let TruncateOffset::Barrier { epoch } = offset
258            && epoch == *sealed_epoch
259        {
260            let sealed_epoch = *sealed_epoch;
261            self.epoch_progress = Consuming(*next_epoch);
262            self.truncated_epoch_tx
263                .send(sealed_epoch)
264                .map_err(|_| anyhow!("unable to send sealed epoch"))?;
265        }
266        self.truncate_offset = offset;
267        Ok(())
268    }
269
270    async fn rewind(&mut self) -> LogStoreResult<()> {
271        Err(anyhow!("should not call rewind on it"))
272    }
273
274    async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
275        Ok(())
276    }
277}
278
279impl LogWriter for BoundedInMemLogStoreWriter {
280    async fn init(
281        &mut self,
282        epoch: EpochPair,
283        _pause_read_on_bootstrap: bool,
284    ) -> LogStoreResult<()> {
285        let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
286        self.wait_init_epoch
287            .take()
288            .expect("cannot be init for in-mem log store")(epoch)
289        .await?;
290        init_epoch_tx
291            .send(epoch.curr)
292            .map_err(|_| anyhow!("unable to send init epoch"))?;
293        self.curr_epoch = Some(epoch.curr);
294        Ok(())
295    }
296
297    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
298        self.item_tx
299            .send(InMemLogStoreItem::StreamChunk(chunk))
300            .instrument_await("in_mem_send_item_chunk")
301            .await
302            .map_err(|_| anyhow!("unable to send stream chunk"))?;
303        Ok(())
304    }
305
306    async fn flush_current_epoch(
307        &mut self,
308        next_epoch: u64,
309        options: FlushCurrentEpochOptions,
310    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
311        let is_checkpoint = options.is_checkpoint;
312        self.item_tx
313            .send(InMemLogStoreItem::Barrier {
314                next_epoch,
315                options,
316            })
317            .instrument_await("in_mem_send_item_barrier")
318            .await
319            .map_err(|_| anyhow!("unable to send barrier"))?;
320
321        let prev_epoch = self
322            .curr_epoch
323            .replace(next_epoch)
324            .expect("should have epoch");
325
326        if is_checkpoint {
327            let truncated_epoch = self
328                .truncated_epoch_rx
329                .recv()
330                .instrument_await("in_mem_recv_truncated_epoch")
331                .await
332                .ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
333            assert_eq!(truncated_epoch, prev_epoch);
334        }
335
336        Ok(LogWriterPostFlushCurrentEpoch::new(move || {
337            async move { Ok(()) }.boxed()
338        }))
339    }
340
341    fn pause(&mut self) -> LogStoreResult<()> {
342        // no-op when decouple is not enabled
343        Ok(())
344    }
345
346    fn resume(&mut self) -> LogStoreResult<()> {
347        // no-op when decouple is not enabled
348        Ok(())
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use std::future::poll_fn;
355    use std::task::Poll;
356
357    use futures::FutureExt;
358    use risingwave_common::array::{Op, StreamChunkBuilder};
359    use risingwave_common::types::{DataType, ScalarImpl};
360    use risingwave_common::util::epoch::{EpochPair, test_epoch};
361    use risingwave_connector::sink::log_store::{
362        LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
363    };
364
365    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
366    use crate::common::log_store_impl::kv_log_store::test_utils::LogWriterTestExt;
367
368    #[tokio::test]
369    async fn test_in_memory_log_store() {
370        let factory = BoundedInMemLogStoreFactory::for_test(4);
371        let (mut reader, mut writer) = factory.build().await;
372
373        let init_epoch = test_epoch(1);
374        let epoch1 = test_epoch(2);
375        let epoch2 = test_epoch(3);
376
377        let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
378        let mut builder =
379            StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
380        for (i, op) in ops.into_iter().enumerate() {
381            assert!(
382                builder
383                    .append_row(
384                        op,
385                        [
386                            Some(ScalarImpl::Int64(i as i64)),
387                            Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
388                        ]
389                    )
390                    .is_none()
391            );
392        }
393        let stream_chunk = builder.take().unwrap();
394        let stream_chunk_clone = stream_chunk.clone();
395
396        let mut join_handle = tokio::spawn(async move {
397            writer
398                .init(EpochPair::new_test_epoch(init_epoch), false)
399                .await
400                .unwrap();
401            writer
402                .write_chunk(stream_chunk_clone.clone())
403                .await
404                .unwrap();
405            writer
406                .write_chunk(stream_chunk_clone.clone())
407                .await
408                .unwrap();
409            writer
410                .flush_current_epoch_for_test(epoch1, false)
411                .await
412                .unwrap();
413            writer.write_chunk(stream_chunk_clone).await.unwrap();
414            writer
415                .flush_current_epoch_for_test(epoch2, true)
416                .await
417                .unwrap();
418        });
419
420        reader.init().await.unwrap();
421        let _chunk_id1_1 = match reader.next_item().await.unwrap() {
422            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
423                assert_eq!(epoch, init_epoch);
424                assert_eq!(&chunk, &stream_chunk);
425                chunk_id
426            }
427            _ => unreachable!(),
428        };
429
430        let chunk_id1_2 = match reader.next_item().await.unwrap() {
431            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
432                assert_eq!(epoch, init_epoch);
433                assert_eq!(&chunk, &stream_chunk);
434                chunk_id
435            }
436            _ => unreachable!(),
437        };
438
439        match reader.next_item().await.unwrap() {
440            (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
441                assert!(!is_checkpoint);
442                assert_eq!(epoch, init_epoch);
443            }
444            _ => unreachable!(),
445        }
446
447        let chunk_id2_1 = match reader.next_item().await.unwrap() {
448            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
449                assert_eq!(&chunk, &stream_chunk);
450                assert_eq!(epoch, epoch1);
451                chunk_id
452            }
453            _ => unreachable!(),
454        };
455
456        match reader.next_item().await.unwrap() {
457            (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
458                assert!(is_checkpoint);
459                assert_eq!(epoch, epoch1);
460            }
461            _ => unreachable!(),
462        }
463
464        reader
465            .truncate(TruncateOffset::Chunk {
466                epoch: init_epoch,
467                chunk_id: chunk_id1_2,
468            })
469            .unwrap();
470        assert!(
471            poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
472                .await
473                .is_pending()
474        );
475        reader
476            .truncate(TruncateOffset::Chunk {
477                epoch: epoch1,
478                chunk_id: chunk_id2_1,
479            })
480            .unwrap();
481        assert!(
482            poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
483                .await
484                .is_pending()
485        );
486        reader
487            .truncate(TruncateOffset::Barrier { epoch: epoch1 })
488            .unwrap();
489        join_handle.await.unwrap();
490    }
491}