risingwave_stream/common/log_store_impl/
in_mem.rs1use anyhow::{Context, anyhow};
16use await_tree::InstrumentAwait;
17use futures::FutureExt;
18use risingwave_common::array::StreamChunk;
19use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
20use risingwave_connector::sink::log_store::{
21 FlushCurrentEpochOptions, LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult,
22 LogWriter, LogWriterPostFlushCurrentEpoch, TruncateOffset,
23};
24use tokio::sync::mpsc::{
25 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
26};
27use tokio::sync::oneshot;
28
29use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
30
31enum InMemLogStoreItem {
32 StreamChunk(StreamChunk),
33 Barrier {
34 next_epoch: u64,
35 options: FlushCurrentEpochOptions,
36 },
37}
38
39pub struct BoundedInMemLogStoreWriter {
45 curr_epoch: Option<u64>,
47
48 init_epoch_tx: Option<oneshot::Sender<u64>>,
50
51 item_tx: Sender<InMemLogStoreItem>,
53
54 truncated_epoch_rx: UnboundedReceiver<u64>,
56}
57
58#[derive(Eq, PartialEq, Debug)]
59enum LogReaderEpochProgress {
60 Consuming(u64),
62 AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
64}
65
66const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
67
68pub struct BoundedInMemLogStoreReader {
69 epoch_progress: LogReaderEpochProgress,
72
73 init_epoch_rx: Option<oneshot::Receiver<u64>>,
75
76 item_rx: Receiver<InMemLogStoreItem>,
78
79 truncated_epoch_tx: UnboundedSender<u64>,
81
82 latest_offset: TruncateOffset,
84
85 truncate_offset: TruncateOffset,
87}
88
89pub struct BoundedInMemLogStoreFactory {
90 bound: usize,
91}
92
93impl BoundedInMemLogStoreFactory {
94 pub fn new(bound: usize) -> Self {
95 Self { bound }
96 }
97}
98
99impl LogStoreFactory for BoundedInMemLogStoreFactory {
100 type Reader = BoundedInMemLogStoreReader;
101 type Writer = BoundedInMemLogStoreWriter;
102
103 const ALLOW_REWIND: bool = false;
104 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool = false;
105
106 async fn build(self) -> (Self::Reader, Self::Writer) {
107 let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
108 let (item_tx, item_rx) = channel(self.bound);
109 let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
110 let reader = BoundedInMemLogStoreReader {
111 epoch_progress: UNINITIALIZED,
112 init_epoch_rx: Some(init_epoch_rx),
113 item_rx,
114 truncated_epoch_tx,
115 latest_offset: TruncateOffset::Barrier { epoch: 0 },
116 truncate_offset: TruncateOffset::Barrier { epoch: 0 },
117 };
118 let writer = BoundedInMemLogStoreWriter {
119 curr_epoch: None,
120 init_epoch_tx: Some(init_epoch_tx),
121 item_tx,
122 truncated_epoch_rx,
123 };
124 (reader, writer)
125 }
126}
127
128impl LogReader for BoundedInMemLogStoreReader {
129 async fn init(&mut self) -> LogStoreResult<()> {
130 let init_epoch_rx = self
131 .init_epoch_rx
132 .take()
133 .expect("should not init for twice");
134 let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
135 assert_eq!(self.epoch_progress, UNINITIALIZED);
136 self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
137 self.latest_offset = TruncateOffset::Barrier {
138 epoch: epoch.prev_epoch(),
139 };
140 self.truncate_offset = TruncateOffset::Barrier {
141 epoch: epoch.prev_epoch(),
142 };
143 Ok(())
144 }
145
146 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
147 match self.item_rx.recv().await {
148 Some(item) => match self.epoch_progress {
149 Consuming(current_epoch) => match item {
150 InMemLogStoreItem::StreamChunk(chunk) => {
151 let chunk_id = match self.latest_offset {
152 TruncateOffset::Chunk { epoch, chunk_id } => {
153 assert_eq!(epoch, current_epoch);
154 chunk_id + 1
155 }
156 TruncateOffset::Barrier { epoch } => {
157 assert!(
158 epoch < current_epoch,
159 "prev offset at barrier {} but current epoch {}",
160 epoch,
161 current_epoch
162 );
163 0
164 }
165 };
166 self.latest_offset = TruncateOffset::Chunk {
167 epoch: current_epoch,
168 chunk_id,
169 };
170 Ok((
171 current_epoch,
172 LogStoreReadItem::StreamChunk { chunk, chunk_id },
173 ))
174 }
175 InMemLogStoreItem::Barrier {
176 next_epoch,
177 options,
178 } => {
179 if options.is_checkpoint {
180 self.epoch_progress = AwaitingTruncate {
181 next_epoch,
182 sealed_epoch: current_epoch,
183 };
184 } else {
185 self.epoch_progress = Consuming(next_epoch);
186 }
187 self.latest_offset = TruncateOffset::Barrier {
188 epoch: current_epoch,
189 };
190 Ok((
191 current_epoch,
192 LogStoreReadItem::Barrier {
193 is_checkpoint: options.is_checkpoint,
194 new_vnode_bitmap: options.new_vnode_bitmap,
195 is_stop: options.is_stop,
196 },
197 ))
198 }
199 },
200 AwaitingTruncate { .. } => Err(anyhow!(
201 "should not call next_item on checkpoint barrier for in-mem log store"
202 )),
203 },
204 None => Err(anyhow!("end of log stream")),
205 }
206 }
207
208 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
209 if self.truncate_offset >= offset {
211 return Err(anyhow!(
212 "truncate offset {:?} but prev truncate offset is {:?}",
213 offset,
214 self.truncate_offset
215 ));
216 }
217
218 if offset > self.latest_offset {
220 return Err(anyhow!(
221 "truncate at {:?} but latest offset is {:?}",
222 offset,
223 self.latest_offset
224 ));
225 }
226
227 if let AwaitingTruncate {
228 sealed_epoch,
229 next_epoch,
230 } = &self.epoch_progress
231 && let TruncateOffset::Barrier { epoch } = offset
232 && epoch == *sealed_epoch
233 {
234 let sealed_epoch = *sealed_epoch;
235 self.epoch_progress = Consuming(*next_epoch);
236 self.truncated_epoch_tx
237 .send(sealed_epoch)
238 .map_err(|_| anyhow!("unable to send sealed epoch"))?;
239 }
240 self.truncate_offset = offset;
241 Ok(())
242 }
243
244 async fn rewind(&mut self) -> LogStoreResult<()> {
245 Err(anyhow!("should not call rewind on it"))
246 }
247
248 async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
249 Ok(())
250 }
251}
252
253impl LogWriter for BoundedInMemLogStoreWriter {
254 async fn init(
255 &mut self,
256 epoch: EpochPair,
257 _pause_read_on_bootstrap: bool,
258 ) -> LogStoreResult<()> {
259 let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
260 init_epoch_tx
261 .send(epoch.curr)
262 .map_err(|_| anyhow!("unable to send init epoch"))?;
263 self.curr_epoch = Some(epoch.curr);
264 Ok(())
265 }
266
267 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
268 self.item_tx
269 .send(InMemLogStoreItem::StreamChunk(chunk))
270 .instrument_await("in_mem_send_item_chunk")
271 .await
272 .map_err(|_| anyhow!("unable to send stream chunk"))?;
273 Ok(())
274 }
275
276 async fn flush_current_epoch(
277 &mut self,
278 next_epoch: u64,
279 options: FlushCurrentEpochOptions,
280 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
281 let is_checkpoint = options.is_checkpoint;
282 self.item_tx
283 .send(InMemLogStoreItem::Barrier {
284 next_epoch,
285 options,
286 })
287 .instrument_await("in_mem_send_item_barrier")
288 .await
289 .map_err(|_| anyhow!("unable to send barrier"))?;
290
291 let prev_epoch = self
292 .curr_epoch
293 .replace(next_epoch)
294 .expect("should have epoch");
295
296 if is_checkpoint {
297 let truncated_epoch = self
298 .truncated_epoch_rx
299 .recv()
300 .instrument_await("in_mem_recv_truncated_epoch")
301 .await
302 .ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
303 assert_eq!(truncated_epoch, prev_epoch);
304 }
305
306 Ok(LogWriterPostFlushCurrentEpoch::new(move || {
307 async move { Ok(()) }.boxed()
308 }))
309 }
310
311 fn pause(&mut self) -> LogStoreResult<()> {
312 Ok(())
314 }
315
316 fn resume(&mut self) -> LogStoreResult<()> {
317 Ok(())
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use std::future::poll_fn;
325 use std::task::Poll;
326
327 use futures::FutureExt;
328 use risingwave_common::array::{Op, StreamChunkBuilder};
329 use risingwave_common::types::{DataType, ScalarImpl};
330 use risingwave_common::util::epoch::{EpochPair, test_epoch};
331 use risingwave_connector::sink::log_store::{
332 LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
333 };
334
335 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
336 use crate::common::log_store_impl::kv_log_store::test_utils::LogWriterTestExt;
337
338 #[tokio::test]
339 async fn test_in_memory_log_store() {
340 let factory = BoundedInMemLogStoreFactory::new(4);
341 let (mut reader, mut writer) = factory.build().await;
342
343 let init_epoch = test_epoch(1);
344 let epoch1 = test_epoch(2);
345 let epoch2 = test_epoch(3);
346
347 let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
348 let mut builder =
349 StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
350 for (i, op) in ops.into_iter().enumerate() {
351 assert!(
352 builder
353 .append_row(
354 op,
355 [
356 Some(ScalarImpl::Int64(i as i64)),
357 Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
358 ]
359 )
360 .is_none()
361 );
362 }
363 let stream_chunk = builder.take().unwrap();
364 let stream_chunk_clone = stream_chunk.clone();
365
366 let mut join_handle = tokio::spawn(async move {
367 writer
368 .init(EpochPair::new_test_epoch(init_epoch), false)
369 .await
370 .unwrap();
371 writer
372 .write_chunk(stream_chunk_clone.clone())
373 .await
374 .unwrap();
375 writer
376 .write_chunk(stream_chunk_clone.clone())
377 .await
378 .unwrap();
379 writer
380 .flush_current_epoch_for_test(epoch1, false)
381 .await
382 .unwrap();
383 writer.write_chunk(stream_chunk_clone).await.unwrap();
384 writer
385 .flush_current_epoch_for_test(epoch2, true)
386 .await
387 .unwrap();
388 });
389
390 reader.init().await.unwrap();
391 let _chunk_id1_1 = match reader.next_item().await.unwrap() {
392 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
393 assert_eq!(epoch, init_epoch);
394 assert_eq!(&chunk, &stream_chunk);
395 chunk_id
396 }
397 _ => unreachable!(),
398 };
399
400 let chunk_id1_2 = match reader.next_item().await.unwrap() {
401 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
402 assert_eq!(epoch, init_epoch);
403 assert_eq!(&chunk, &stream_chunk);
404 chunk_id
405 }
406 _ => unreachable!(),
407 };
408
409 match reader.next_item().await.unwrap() {
410 (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
411 assert!(!is_checkpoint);
412 assert_eq!(epoch, init_epoch);
413 }
414 _ => unreachable!(),
415 }
416
417 let chunk_id2_1 = match reader.next_item().await.unwrap() {
418 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
419 assert_eq!(&chunk, &stream_chunk);
420 assert_eq!(epoch, epoch1);
421 chunk_id
422 }
423 _ => unreachable!(),
424 };
425
426 match reader.next_item().await.unwrap() {
427 (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
428 assert!(is_checkpoint);
429 assert_eq!(epoch, epoch1);
430 }
431 _ => unreachable!(),
432 }
433
434 reader
435 .truncate(TruncateOffset::Chunk {
436 epoch: init_epoch,
437 chunk_id: chunk_id1_2,
438 })
439 .unwrap();
440 assert!(
441 poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
442 .await
443 .is_pending()
444 );
445 reader
446 .truncate(TruncateOffset::Chunk {
447 epoch: epoch1,
448 chunk_id: chunk_id2_1,
449 })
450 .unwrap();
451 assert!(
452 poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
453 .await
454 .is_pending()
455 );
456 reader
457 .truncate(TruncateOffset::Barrier { epoch: epoch1 })
458 .unwrap();
459 join_handle.await.unwrap();
460 }
461}