risingwave_stream/common/log_store_impl/
in_mem.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use await_tree::InstrumentAwait;
17use futures::FutureExt;
18use risingwave_common::array::StreamChunk;
19use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
20use risingwave_connector::sink::log_store::{
21    FlushCurrentEpochOptions, LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult,
22    LogWriter, LogWriterPostFlushCurrentEpoch, TruncateOffset,
23};
24use tokio::sync::mpsc::{
25    Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
26};
27use tokio::sync::oneshot;
28
29use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
30
31enum InMemLogStoreItem {
32    StreamChunk(StreamChunk),
33    Barrier {
34        next_epoch: u64,
35        options: FlushCurrentEpochOptions,
36    },
37}
38
39/// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded
40/// mpsc channel.
41///
42/// Since it is in-memory, when `flush_current_epoch` with checkpoint epoch, it should wait for the
43/// reader to finish consuming all the data in current checkpoint epoch.
44pub struct BoundedInMemLogStoreWriter {
45    /// Current epoch. Should be `Some` after `init`
46    curr_epoch: Option<u64>,
47
48    /// Holder of oneshot channel to send the initial epoch to the associated log reader.
49    init_epoch_tx: Option<oneshot::Sender<u64>>,
50
51    /// Sending log store item to log reader
52    item_tx: Sender<InMemLogStoreItem>,
53
54    /// Receiver for the epoch consumed by log reader.
55    truncated_epoch_rx: UnboundedReceiver<u64>,
56}
57
58#[derive(Eq, PartialEq, Debug)]
59enum LogReaderEpochProgress {
60    /// In progress of consuming data in current epoch.
61    Consuming(u64),
62    /// Finished emitting the data in checkpoint epoch, and waiting for a call on `truncate`.
63    AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
64}
65
66const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
67
68pub struct BoundedInMemLogStoreReader {
69    /// Current progress of log reader. Can be either consuming an epoch, or has finished consuming
70    /// an epoch and waiting to be truncated.
71    epoch_progress: LogReaderEpochProgress,
72
73    /// Holder for oneshot channel to receive the initial epoch
74    init_epoch_rx: Option<oneshot::Receiver<u64>>,
75
76    /// Receiver to fetch log store item
77    item_rx: Receiver<InMemLogStoreItem>,
78
79    /// Sender of consumed epoch to the log writer
80    truncated_epoch_tx: UnboundedSender<u64>,
81
82    /// Offset of the latest emitted item
83    latest_offset: TruncateOffset,
84
85    /// Offset of the latest truncated item
86    truncate_offset: TruncateOffset,
87}
88
89pub struct BoundedInMemLogStoreFactory {
90    bound: usize,
91}
92
93impl BoundedInMemLogStoreFactory {
94    pub fn new(bound: usize) -> Self {
95        Self { bound }
96    }
97}
98
99impl LogStoreFactory for BoundedInMemLogStoreFactory {
100    type Reader = BoundedInMemLogStoreReader;
101    type Writer = BoundedInMemLogStoreWriter;
102
103    const ALLOW_REWIND: bool = false;
104    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool = false;
105
106    async fn build(self) -> (Self::Reader, Self::Writer) {
107        let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
108        let (item_tx, item_rx) = channel(self.bound);
109        let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
110        let reader = BoundedInMemLogStoreReader {
111            epoch_progress: UNINITIALIZED,
112            init_epoch_rx: Some(init_epoch_rx),
113            item_rx,
114            truncated_epoch_tx,
115            latest_offset: TruncateOffset::Barrier { epoch: 0 },
116            truncate_offset: TruncateOffset::Barrier { epoch: 0 },
117        };
118        let writer = BoundedInMemLogStoreWriter {
119            curr_epoch: None,
120            init_epoch_tx: Some(init_epoch_tx),
121            item_tx,
122            truncated_epoch_rx,
123        };
124        (reader, writer)
125    }
126}
127
128impl LogReader for BoundedInMemLogStoreReader {
129    async fn init(&mut self) -> LogStoreResult<()> {
130        let init_epoch_rx = self
131            .init_epoch_rx
132            .take()
133            .expect("should not init for twice");
134        let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
135        assert_eq!(self.epoch_progress, UNINITIALIZED);
136        self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
137        self.latest_offset = TruncateOffset::Barrier {
138            epoch: epoch.prev_epoch(),
139        };
140        self.truncate_offset = TruncateOffset::Barrier {
141            epoch: epoch.prev_epoch(),
142        };
143        Ok(())
144    }
145
146    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
147        match self.item_rx.recv().await {
148            Some(item) => match self.epoch_progress {
149                Consuming(current_epoch) => match item {
150                    InMemLogStoreItem::StreamChunk(chunk) => {
151                        let chunk_id = match self.latest_offset {
152                            TruncateOffset::Chunk { epoch, chunk_id } => {
153                                assert_eq!(epoch, current_epoch);
154                                chunk_id + 1
155                            }
156                            TruncateOffset::Barrier { epoch } => {
157                                assert!(
158                                    epoch < current_epoch,
159                                    "prev offset at barrier {} but current epoch {}",
160                                    epoch,
161                                    current_epoch
162                                );
163                                0
164                            }
165                        };
166                        self.latest_offset = TruncateOffset::Chunk {
167                            epoch: current_epoch,
168                            chunk_id,
169                        };
170                        Ok((
171                            current_epoch,
172                            LogStoreReadItem::StreamChunk { chunk, chunk_id },
173                        ))
174                    }
175                    InMemLogStoreItem::Barrier {
176                        next_epoch,
177                        options,
178                    } => {
179                        if options.is_checkpoint {
180                            self.epoch_progress = AwaitingTruncate {
181                                next_epoch,
182                                sealed_epoch: current_epoch,
183                            };
184                        } else {
185                            self.epoch_progress = Consuming(next_epoch);
186                        }
187                        self.latest_offset = TruncateOffset::Barrier {
188                            epoch: current_epoch,
189                        };
190                        Ok((
191                            current_epoch,
192                            LogStoreReadItem::Barrier {
193                                is_checkpoint: options.is_checkpoint,
194                                new_vnode_bitmap: options.new_vnode_bitmap,
195                                is_stop: options.is_stop,
196                            },
197                        ))
198                    }
199                },
200                AwaitingTruncate { .. } => Err(anyhow!(
201                    "should not call next_item on checkpoint barrier for in-mem log store"
202                )),
203            },
204            None => Err(anyhow!("end of log stream")),
205        }
206    }
207
208    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
209        // check the truncate offset is higher than prev truncate offset
210        if self.truncate_offset >= offset {
211            return Err(anyhow!(
212                "truncate offset {:?} but prev truncate offset is {:?}",
213                offset,
214                self.truncate_offset
215            ));
216        }
217
218        // check the truncate offset does not exceed the latest possible offset
219        if offset > self.latest_offset {
220            return Err(anyhow!(
221                "truncate at {:?} but latest offset is {:?}",
222                offset,
223                self.latest_offset
224            ));
225        }
226
227        if let AwaitingTruncate {
228            sealed_epoch,
229            next_epoch,
230        } = &self.epoch_progress
231            && let TruncateOffset::Barrier { epoch } = offset
232            && epoch == *sealed_epoch
233        {
234            let sealed_epoch = *sealed_epoch;
235            self.epoch_progress = Consuming(*next_epoch);
236            self.truncated_epoch_tx
237                .send(sealed_epoch)
238                .map_err(|_| anyhow!("unable to send sealed epoch"))?;
239        }
240        self.truncate_offset = offset;
241        Ok(())
242    }
243
244    async fn rewind(&mut self) -> LogStoreResult<()> {
245        Err(anyhow!("should not call rewind on it"))
246    }
247
248    async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
249        Ok(())
250    }
251}
252
253impl LogWriter for BoundedInMemLogStoreWriter {
254    async fn init(
255        &mut self,
256        epoch: EpochPair,
257        _pause_read_on_bootstrap: bool,
258    ) -> LogStoreResult<()> {
259        let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
260        init_epoch_tx
261            .send(epoch.curr)
262            .map_err(|_| anyhow!("unable to send init epoch"))?;
263        self.curr_epoch = Some(epoch.curr);
264        Ok(())
265    }
266
267    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
268        self.item_tx
269            .send(InMemLogStoreItem::StreamChunk(chunk))
270            .instrument_await("in_mem_send_item_chunk")
271            .await
272            .map_err(|_| anyhow!("unable to send stream chunk"))?;
273        Ok(())
274    }
275
276    async fn flush_current_epoch(
277        &mut self,
278        next_epoch: u64,
279        options: FlushCurrentEpochOptions,
280    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
281        let is_checkpoint = options.is_checkpoint;
282        self.item_tx
283            .send(InMemLogStoreItem::Barrier {
284                next_epoch,
285                options,
286            })
287            .instrument_await("in_mem_send_item_barrier")
288            .await
289            .map_err(|_| anyhow!("unable to send barrier"))?;
290
291        let prev_epoch = self
292            .curr_epoch
293            .replace(next_epoch)
294            .expect("should have epoch");
295
296        if is_checkpoint {
297            let truncated_epoch = self
298                .truncated_epoch_rx
299                .recv()
300                .instrument_await("in_mem_recv_truncated_epoch")
301                .await
302                .ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
303            assert_eq!(truncated_epoch, prev_epoch);
304        }
305
306        Ok(LogWriterPostFlushCurrentEpoch::new(move || {
307            async move { Ok(()) }.boxed()
308        }))
309    }
310
311    fn pause(&mut self) -> LogStoreResult<()> {
312        // no-op when decouple is not enabled
313        Ok(())
314    }
315
316    fn resume(&mut self) -> LogStoreResult<()> {
317        // no-op when decouple is not enabled
318        Ok(())
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use std::future::poll_fn;
325    use std::task::Poll;
326
327    use futures::FutureExt;
328    use risingwave_common::array::{Op, StreamChunkBuilder};
329    use risingwave_common::types::{DataType, ScalarImpl};
330    use risingwave_common::util::epoch::{EpochPair, test_epoch};
331    use risingwave_connector::sink::log_store::{
332        LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
333    };
334
335    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
336    use crate::common::log_store_impl::kv_log_store::test_utils::LogWriterTestExt;
337
338    #[tokio::test]
339    async fn test_in_memory_log_store() {
340        let factory = BoundedInMemLogStoreFactory::new(4);
341        let (mut reader, mut writer) = factory.build().await;
342
343        let init_epoch = test_epoch(1);
344        let epoch1 = test_epoch(2);
345        let epoch2 = test_epoch(3);
346
347        let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
348        let mut builder =
349            StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
350        for (i, op) in ops.into_iter().enumerate() {
351            assert!(
352                builder
353                    .append_row(
354                        op,
355                        [
356                            Some(ScalarImpl::Int64(i as i64)),
357                            Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
358                        ]
359                    )
360                    .is_none()
361            );
362        }
363        let stream_chunk = builder.take().unwrap();
364        let stream_chunk_clone = stream_chunk.clone();
365
366        let mut join_handle = tokio::spawn(async move {
367            writer
368                .init(EpochPair::new_test_epoch(init_epoch), false)
369                .await
370                .unwrap();
371            writer
372                .write_chunk(stream_chunk_clone.clone())
373                .await
374                .unwrap();
375            writer
376                .write_chunk(stream_chunk_clone.clone())
377                .await
378                .unwrap();
379            writer
380                .flush_current_epoch_for_test(epoch1, false)
381                .await
382                .unwrap();
383            writer.write_chunk(stream_chunk_clone).await.unwrap();
384            writer
385                .flush_current_epoch_for_test(epoch2, true)
386                .await
387                .unwrap();
388        });
389
390        reader.init().await.unwrap();
391        let _chunk_id1_1 = match reader.next_item().await.unwrap() {
392            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
393                assert_eq!(epoch, init_epoch);
394                assert_eq!(&chunk, &stream_chunk);
395                chunk_id
396            }
397            _ => unreachable!(),
398        };
399
400        let chunk_id1_2 = match reader.next_item().await.unwrap() {
401            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
402                assert_eq!(epoch, init_epoch);
403                assert_eq!(&chunk, &stream_chunk);
404                chunk_id
405            }
406            _ => unreachable!(),
407        };
408
409        match reader.next_item().await.unwrap() {
410            (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
411                assert!(!is_checkpoint);
412                assert_eq!(epoch, init_epoch);
413            }
414            _ => unreachable!(),
415        }
416
417        let chunk_id2_1 = match reader.next_item().await.unwrap() {
418            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
419                assert_eq!(&chunk, &stream_chunk);
420                assert_eq!(epoch, epoch1);
421                chunk_id
422            }
423            _ => unreachable!(),
424        };
425
426        match reader.next_item().await.unwrap() {
427            (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
428                assert!(is_checkpoint);
429                assert_eq!(epoch, epoch1);
430            }
431            _ => unreachable!(),
432        }
433
434        reader
435            .truncate(TruncateOffset::Chunk {
436                epoch: init_epoch,
437                chunk_id: chunk_id1_2,
438            })
439            .unwrap();
440        assert!(
441            poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
442                .await
443                .is_pending()
444        );
445        reader
446            .truncate(TruncateOffset::Chunk {
447                epoch: epoch1,
448                chunk_id: chunk_id2_1,
449            })
450            .unwrap();
451        assert!(
452            poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
453                .await
454                .is_pending()
455        );
456        reader
457            .truncate(TruncateOffset::Barrier { epoch: epoch1 })
458            .unwrap();
459        join_handle.await.unwrap();
460    }
461}