risingwave_stream/common/log_store_impl/
in_mem.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use await_tree::InstrumentAwait;
17use futures::FutureExt;
18use risingwave_common::array::StreamChunk;
19use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
20use risingwave_connector::sink::log_store::{
21    FlushCurrentEpochOptions, LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult,
22    LogWriter, LogWriterPostFlushCurrentEpoch, TruncateOffset,
23};
24use tokio::sync::mpsc::{
25    Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
26};
27use tokio::sync::oneshot;
28
29use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
30
31enum InMemLogStoreItem {
32    StreamChunk(StreamChunk),
33    Barrier {
34        next_epoch: u64,
35        options: FlushCurrentEpochOptions,
36    },
37}
38
39/// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded
40/// mpsc channel.
41///
42/// Since it is in-memory, when `flush_current_epoch` with checkpoint epoch, it should wait for the
43/// reader to finish consuming all the data in current checkpoint epoch.
44pub struct BoundedInMemLogStoreWriter {
45    /// Current epoch. Should be `Some` after `init`
46    curr_epoch: Option<u64>,
47
48    /// Holder of oneshot channel to send the initial epoch to the associated log reader.
49    init_epoch_tx: Option<oneshot::Sender<u64>>,
50
51    /// Sending log store item to log reader
52    item_tx: Sender<InMemLogStoreItem>,
53
54    /// Receiver for the epoch consumed by log reader.
55    truncated_epoch_rx: UnboundedReceiver<u64>,
56}
57
58#[derive(Eq, PartialEq, Debug)]
59enum LogReaderEpochProgress {
60    /// In progress of consuming data in current epoch.
61    Consuming(u64),
62    /// Finished emitting the data in checkpoint epoch, and waiting for a call on `truncate`.
63    AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
64}
65
66const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
67
68pub struct BoundedInMemLogStoreReader {
69    /// Current progress of log reader. Can be either consuming an epoch, or has finished consuming
70    /// an epoch and waiting to be truncated.
71    epoch_progress: LogReaderEpochProgress,
72
73    /// Holder for oneshot channel to receive the initial epoch
74    init_epoch_rx: Option<oneshot::Receiver<u64>>,
75
76    /// Receiver to fetch log store item
77    item_rx: Receiver<InMemLogStoreItem>,
78
79    /// Sender of consumed epoch to the log writer
80    truncated_epoch_tx: UnboundedSender<u64>,
81
82    /// Offset of the latest emitted item
83    latest_offset: TruncateOffset,
84
85    /// Offset of the latest truncated item
86    truncate_offset: TruncateOffset,
87}
88
89pub struct BoundedInMemLogStoreFactory {
90    bound: usize,
91}
92
93impl BoundedInMemLogStoreFactory {
94    pub fn new(bound: usize) -> Self {
95        Self { bound }
96    }
97}
98
99impl LogStoreFactory for BoundedInMemLogStoreFactory {
100    type Reader = BoundedInMemLogStoreReader;
101    type Writer = BoundedInMemLogStoreWriter;
102
103    const ALLOW_REWIND: bool = false;
104    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool = false;
105
106    async fn build(self) -> (Self::Reader, Self::Writer) {
107        let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
108        let (item_tx, item_rx) = channel(self.bound);
109        let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
110        let reader = BoundedInMemLogStoreReader {
111            epoch_progress: UNINITIALIZED,
112            init_epoch_rx: Some(init_epoch_rx),
113            item_rx,
114            truncated_epoch_tx,
115            latest_offset: TruncateOffset::Barrier { epoch: 0 },
116            truncate_offset: TruncateOffset::Barrier { epoch: 0 },
117        };
118        let writer = BoundedInMemLogStoreWriter {
119            curr_epoch: None,
120            init_epoch_tx: Some(init_epoch_tx),
121            item_tx,
122            truncated_epoch_rx,
123        };
124        (reader, writer)
125    }
126}
127
128impl LogReader for BoundedInMemLogStoreReader {
129    async fn init(&mut self) -> LogStoreResult<()> {
130        let init_epoch_rx = self
131            .init_epoch_rx
132            .take()
133            .expect("should not init for twice");
134        let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
135        assert_eq!(self.epoch_progress, UNINITIALIZED);
136        self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
137        self.latest_offset = TruncateOffset::Barrier {
138            epoch: epoch.prev_epoch(),
139        };
140        self.truncate_offset = TruncateOffset::Barrier {
141            epoch: epoch.prev_epoch(),
142        };
143        Ok(())
144    }
145
146    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
147        match self.item_rx.recv().await {
148            Some(item) => match self.epoch_progress {
149                Consuming(current_epoch) => match item {
150                    InMemLogStoreItem::StreamChunk(chunk) => {
151                        let chunk_id = match self.latest_offset {
152                            TruncateOffset::Chunk { epoch, chunk_id } => {
153                                assert_eq!(epoch, current_epoch);
154                                chunk_id + 1
155                            }
156                            TruncateOffset::Barrier { epoch } => {
157                                assert!(
158                                    epoch < current_epoch,
159                                    "prev offset at barrier {} but current epoch {}",
160                                    epoch,
161                                    current_epoch
162                                );
163                                0
164                            }
165                        };
166                        self.latest_offset = TruncateOffset::Chunk {
167                            epoch: current_epoch,
168                            chunk_id,
169                        };
170                        Ok((
171                            current_epoch,
172                            LogStoreReadItem::StreamChunk { chunk, chunk_id },
173                        ))
174                    }
175                    InMemLogStoreItem::Barrier {
176                        next_epoch,
177                        options,
178                    } => {
179                        if options.is_checkpoint {
180                            self.epoch_progress = AwaitingTruncate {
181                                next_epoch,
182                                sealed_epoch: current_epoch,
183                            };
184                        } else {
185                            self.epoch_progress = Consuming(next_epoch);
186                        }
187                        self.latest_offset = TruncateOffset::Barrier {
188                            epoch: current_epoch,
189                        };
190                        Ok((
191                            current_epoch,
192                            LogStoreReadItem::Barrier {
193                                is_checkpoint: options.is_checkpoint,
194                                new_vnode_bitmap: options.new_vnode_bitmap,
195                                is_stop: options.is_stop,
196                            },
197                        ))
198                    }
199                },
200                AwaitingTruncate { .. } => Err(anyhow!(
201                    "should not call next_item on checkpoint barrier for in-mem log store"
202                )),
203            },
204            None => Err(anyhow!("end of log stream")),
205        }
206    }
207
208    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
209        // check the truncate offset is higher than prev truncate offset
210        if self.truncate_offset >= offset {
211            return Err(anyhow!(
212                "truncate offset {:?} but prev truncate offset is {:?}",
213                offset,
214                self.truncate_offset
215            ));
216        }
217
218        // check the truncate offset does not exceed the latest possible offset
219        if offset > self.latest_offset {
220            return Err(anyhow!(
221                "truncate at {:?} but latest offset is {:?}",
222                offset,
223                self.latest_offset
224            ));
225        }
226
227        if let AwaitingTruncate {
228            sealed_epoch,
229            next_epoch,
230        } = &self.epoch_progress
231        {
232            if let TruncateOffset::Barrier { epoch } = offset
233                && epoch == *sealed_epoch
234            {
235                let sealed_epoch = *sealed_epoch;
236                self.epoch_progress = Consuming(*next_epoch);
237                self.truncated_epoch_tx
238                    .send(sealed_epoch)
239                    .map_err(|_| anyhow!("unable to send sealed epoch"))?;
240            }
241        }
242        self.truncate_offset = offset;
243        Ok(())
244    }
245
246    async fn rewind(&mut self) -> LogStoreResult<()> {
247        Err(anyhow!("should not call rewind on it"))
248    }
249
250    async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
251        Ok(())
252    }
253}
254
255impl LogWriter for BoundedInMemLogStoreWriter {
256    async fn init(
257        &mut self,
258        epoch: EpochPair,
259        _pause_read_on_bootstrap: bool,
260    ) -> LogStoreResult<()> {
261        let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
262        init_epoch_tx
263            .send(epoch.curr)
264            .map_err(|_| anyhow!("unable to send init epoch"))?;
265        self.curr_epoch = Some(epoch.curr);
266        Ok(())
267    }
268
269    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
270        self.item_tx
271            .send(InMemLogStoreItem::StreamChunk(chunk))
272            .instrument_await("in_mem_send_item_chunk")
273            .await
274            .map_err(|_| anyhow!("unable to send stream chunk"))?;
275        Ok(())
276    }
277
278    async fn flush_current_epoch(
279        &mut self,
280        next_epoch: u64,
281        options: FlushCurrentEpochOptions,
282    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
283        let is_checkpoint = options.is_checkpoint;
284        self.item_tx
285            .send(InMemLogStoreItem::Barrier {
286                next_epoch,
287                options,
288            })
289            .instrument_await("in_mem_send_item_barrier")
290            .await
291            .map_err(|_| anyhow!("unable to send barrier"))?;
292
293        let prev_epoch = self
294            .curr_epoch
295            .replace(next_epoch)
296            .expect("should have epoch");
297
298        if is_checkpoint {
299            let truncated_epoch = self
300                .truncated_epoch_rx
301                .recv()
302                .instrument_await("in_mem_recv_truncated_epoch")
303                .await
304                .ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
305            assert_eq!(truncated_epoch, prev_epoch);
306        }
307
308        Ok(LogWriterPostFlushCurrentEpoch::new(move || {
309            async move { Ok(()) }.boxed()
310        }))
311    }
312
313    fn pause(&mut self) -> LogStoreResult<()> {
314        // no-op when decouple is not enabled
315        Ok(())
316    }
317
318    fn resume(&mut self) -> LogStoreResult<()> {
319        // no-op when decouple is not enabled
320        Ok(())
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::future::poll_fn;
327    use std::task::Poll;
328
329    use futures::FutureExt;
330    use risingwave_common::array::{Op, StreamChunkBuilder};
331    use risingwave_common::types::{DataType, ScalarImpl};
332    use risingwave_common::util::epoch::{EpochPair, test_epoch};
333    use risingwave_connector::sink::log_store::{
334        LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
335    };
336
337    use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
338    use crate::common::log_store_impl::kv_log_store::test_utils::LogWriterTestExt;
339
340    #[tokio::test]
341    async fn test_in_memory_log_store() {
342        let factory = BoundedInMemLogStoreFactory::new(4);
343        let (mut reader, mut writer) = factory.build().await;
344
345        let init_epoch = test_epoch(1);
346        let epoch1 = test_epoch(2);
347        let epoch2 = test_epoch(3);
348
349        let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
350        let mut builder =
351            StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
352        for (i, op) in ops.into_iter().enumerate() {
353            assert!(
354                builder
355                    .append_row(
356                        op,
357                        [
358                            Some(ScalarImpl::Int64(i as i64)),
359                            Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
360                        ]
361                    )
362                    .is_none()
363            );
364        }
365        let stream_chunk = builder.take().unwrap();
366        let stream_chunk_clone = stream_chunk.clone();
367
368        let mut join_handle = tokio::spawn(async move {
369            writer
370                .init(EpochPair::new_test_epoch(init_epoch), false)
371                .await
372                .unwrap();
373            writer
374                .write_chunk(stream_chunk_clone.clone())
375                .await
376                .unwrap();
377            writer
378                .write_chunk(stream_chunk_clone.clone())
379                .await
380                .unwrap();
381            writer
382                .flush_current_epoch_for_test(epoch1, false)
383                .await
384                .unwrap();
385            writer.write_chunk(stream_chunk_clone).await.unwrap();
386            writer
387                .flush_current_epoch_for_test(epoch2, true)
388                .await
389                .unwrap();
390        });
391
392        reader.init().await.unwrap();
393        let _chunk_id1_1 = match reader.next_item().await.unwrap() {
394            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
395                assert_eq!(epoch, init_epoch);
396                assert_eq!(&chunk, &stream_chunk);
397                chunk_id
398            }
399            _ => unreachable!(),
400        };
401
402        let chunk_id1_2 = match reader.next_item().await.unwrap() {
403            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
404                assert_eq!(epoch, init_epoch);
405                assert_eq!(&chunk, &stream_chunk);
406                chunk_id
407            }
408            _ => unreachable!(),
409        };
410
411        match reader.next_item().await.unwrap() {
412            (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
413                assert!(!is_checkpoint);
414                assert_eq!(epoch, init_epoch);
415            }
416            _ => unreachable!(),
417        }
418
419        let chunk_id2_1 = match reader.next_item().await.unwrap() {
420            (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
421                assert_eq!(&chunk, &stream_chunk);
422                assert_eq!(epoch, epoch1);
423                chunk_id
424            }
425            _ => unreachable!(),
426        };
427
428        match reader.next_item().await.unwrap() {
429            (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
430                assert!(is_checkpoint);
431                assert_eq!(epoch, epoch1);
432            }
433            _ => unreachable!(),
434        }
435
436        reader
437            .truncate(TruncateOffset::Chunk {
438                epoch: init_epoch,
439                chunk_id: chunk_id1_2,
440            })
441            .unwrap();
442        assert!(
443            poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
444                .await
445                .is_pending()
446        );
447        reader
448            .truncate(TruncateOffset::Chunk {
449                epoch: epoch1,
450                chunk_id: chunk_id2_1,
451            })
452            .unwrap();
453        assert!(
454            poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
455                .await
456                .is_pending()
457        );
458        reader
459            .truncate(TruncateOffset::Barrier { epoch: epoch1 })
460            .unwrap();
461        join_handle.await.unwrap();
462    }
463}