risingwave_stream/common/log_store_impl/
in_mem.rs1use anyhow::{Context, anyhow};
16use await_tree::InstrumentAwait;
17use futures::FutureExt;
18use futures::future::BoxFuture;
19use risingwave_common::array::StreamChunk;
20use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
21use risingwave_connector::sink::log_store::{
22 FlushCurrentEpochOptions, LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult,
23 LogWriter, LogWriterPostFlushCurrentEpoch, TruncateOffset,
24};
25use tokio::sync::mpsc::{
26 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
27};
28use tokio::sync::oneshot;
29
30use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
31use crate::executor::StreamExecutorResult;
32
33enum InMemLogStoreItem {
34 StreamChunk(StreamChunk),
35 Barrier {
36 next_epoch: u64,
37 options: FlushCurrentEpochOptions,
38 },
39}
40
41pub struct BoundedInMemLogStoreWriter {
47 curr_epoch: Option<u64>,
49
50 init_epoch_tx: Option<oneshot::Sender<u64>>,
52
53 item_tx: Sender<InMemLogStoreItem>,
55
56 truncated_epoch_rx: UnboundedReceiver<u64>,
58
59 wait_init_epoch: Option<WaitInitEpochFn>,
60}
61
62#[derive(Eq, PartialEq, Debug)]
63enum LogReaderEpochProgress {
64 Consuming(u64),
66 AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
68}
69
70const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
71
72pub struct BoundedInMemLogStoreReader {
73 epoch_progress: LogReaderEpochProgress,
76
77 init_epoch_rx: Option<oneshot::Receiver<u64>>,
79
80 item_rx: Receiver<InMemLogStoreItem>,
82
83 truncated_epoch_tx: UnboundedSender<u64>,
85
86 latest_offset: TruncateOffset,
88
89 truncate_offset: TruncateOffset,
91}
92
93type WaitInitEpochFn =
94 Box<dyn FnOnce(EpochPair) -> BoxFuture<'static, StreamExecutorResult<()>> + Send + 'static>;
95
96pub struct BoundedInMemLogStoreFactory {
97 bound: usize,
98 wait_init_epoch: WaitInitEpochFn,
99}
100
101impl BoundedInMemLogStoreFactory {
102 pub fn new(
103 bound: usize,
104 wait_init_epoch: impl FnOnce(EpochPair) -> BoxFuture<'static, StreamExecutorResult<()>>
105 + Send
106 + 'static,
107 ) -> Self {
108 Self {
109 bound,
110 wait_init_epoch: Box::new(wait_init_epoch),
111 }
112 }
113
114 #[cfg(test)]
115 pub fn for_test(bound: usize) -> Self {
116 Self {
117 bound,
118 wait_init_epoch: Box::new(|_x| std::future::ready(Ok(())).boxed()),
119 }
120 }
121}
122
123impl LogStoreFactory for BoundedInMemLogStoreFactory {
124 type Reader = BoundedInMemLogStoreReader;
125 type Writer = BoundedInMemLogStoreWriter;
126
127 const ALLOW_REWIND: bool = false;
128 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool = false;
129
130 async fn build(self) -> (Self::Reader, Self::Writer) {
131 let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
132 let (item_tx, item_rx) = channel(self.bound);
133 let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
134 let reader = BoundedInMemLogStoreReader {
135 epoch_progress: UNINITIALIZED,
136 init_epoch_rx: Some(init_epoch_rx),
137 item_rx,
138 truncated_epoch_tx,
139 latest_offset: TruncateOffset::Barrier { epoch: 0 },
140 truncate_offset: TruncateOffset::Barrier { epoch: 0 },
141 };
142 let writer = BoundedInMemLogStoreWriter {
143 curr_epoch: None,
144 init_epoch_tx: Some(init_epoch_tx),
145 item_tx,
146 truncated_epoch_rx,
147 wait_init_epoch: Some(self.wait_init_epoch),
148 };
149 (reader, writer)
150 }
151}
152
153impl LogReader for BoundedInMemLogStoreReader {
154 async fn init(&mut self) -> LogStoreResult<()> {
155 let init_epoch_rx = self
156 .init_epoch_rx
157 .take()
158 .expect("should not init for twice");
159 let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
160 assert_eq!(self.epoch_progress, UNINITIALIZED);
161 self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
162 self.latest_offset = TruncateOffset::Barrier {
163 epoch: epoch.prev_epoch(),
164 };
165 self.truncate_offset = TruncateOffset::Barrier {
166 epoch: epoch.prev_epoch(),
167 };
168 Ok(())
169 }
170
171 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
172 match self.item_rx.recv().await {
173 Some(item) => match self.epoch_progress {
174 Consuming(current_epoch) => match item {
175 InMemLogStoreItem::StreamChunk(chunk) => {
176 let chunk_id = match self.latest_offset {
177 TruncateOffset::Chunk { epoch, chunk_id } => {
178 assert_eq!(epoch, current_epoch);
179 chunk_id + 1
180 }
181 TruncateOffset::Barrier { epoch } => {
182 assert!(
183 epoch < current_epoch,
184 "prev offset at barrier {} but current epoch {}",
185 epoch,
186 current_epoch
187 );
188 0
189 }
190 };
191 self.latest_offset = TruncateOffset::Chunk {
192 epoch: current_epoch,
193 chunk_id,
194 };
195 Ok((
196 current_epoch,
197 LogStoreReadItem::StreamChunk { chunk, chunk_id },
198 ))
199 }
200 InMemLogStoreItem::Barrier {
201 next_epoch,
202 options,
203 } => {
204 if options.is_checkpoint {
205 self.epoch_progress = AwaitingTruncate {
206 next_epoch,
207 sealed_epoch: current_epoch,
208 };
209 } else {
210 self.epoch_progress = Consuming(next_epoch);
211 }
212 self.latest_offset = TruncateOffset::Barrier {
213 epoch: current_epoch,
214 };
215 Ok((
216 current_epoch,
217 LogStoreReadItem::Barrier {
218 is_checkpoint: options.is_checkpoint,
219 new_vnode_bitmap: options.new_vnode_bitmap,
220 is_stop: options.is_stop,
221 add_columns: options.add_columns,
222 },
223 ))
224 }
225 },
226 AwaitingTruncate { .. } => Err(anyhow!(
227 "should not call next_item on checkpoint barrier for in-mem log store"
228 )),
229 },
230 None => Err(anyhow!("end of log stream")),
231 }
232 }
233
234 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
235 if self.truncate_offset >= offset {
237 return Err(anyhow!(
238 "truncate offset {:?} but prev truncate offset is {:?}",
239 offset,
240 self.truncate_offset
241 ));
242 }
243
244 if offset > self.latest_offset {
246 return Err(anyhow!(
247 "truncate at {:?} but latest offset is {:?}",
248 offset,
249 self.latest_offset
250 ));
251 }
252
253 if let AwaitingTruncate {
254 sealed_epoch,
255 next_epoch,
256 } = &self.epoch_progress
257 && let TruncateOffset::Barrier { epoch } = offset
258 && epoch == *sealed_epoch
259 {
260 let sealed_epoch = *sealed_epoch;
261 self.epoch_progress = Consuming(*next_epoch);
262 self.truncated_epoch_tx
263 .send(sealed_epoch)
264 .map_err(|_| anyhow!("unable to send sealed epoch"))?;
265 }
266 self.truncate_offset = offset;
267 Ok(())
268 }
269
270 async fn rewind(&mut self) -> LogStoreResult<()> {
271 Err(anyhow!("should not call rewind on it"))
272 }
273
274 async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
275 Ok(())
276 }
277}
278
279impl LogWriter for BoundedInMemLogStoreWriter {
280 async fn init(
281 &mut self,
282 epoch: EpochPair,
283 _pause_read_on_bootstrap: bool,
284 ) -> LogStoreResult<()> {
285 let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
286 self.wait_init_epoch
287 .take()
288 .expect("cannot be init for in-mem log store")(epoch)
289 .await?;
290 init_epoch_tx
291 .send(epoch.curr)
292 .map_err(|_| anyhow!("unable to send init epoch"))?;
293 self.curr_epoch = Some(epoch.curr);
294 Ok(())
295 }
296
297 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
298 self.item_tx
299 .send(InMemLogStoreItem::StreamChunk(chunk))
300 .instrument_await("in_mem_send_item_chunk")
301 .await
302 .map_err(|_| anyhow!("unable to send stream chunk"))?;
303 Ok(())
304 }
305
306 async fn flush_current_epoch(
307 &mut self,
308 next_epoch: u64,
309 options: FlushCurrentEpochOptions,
310 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
311 let is_checkpoint = options.is_checkpoint;
312 self.item_tx
313 .send(InMemLogStoreItem::Barrier {
314 next_epoch,
315 options,
316 })
317 .instrument_await("in_mem_send_item_barrier")
318 .await
319 .map_err(|_| anyhow!("unable to send barrier"))?;
320
321 let prev_epoch = self
322 .curr_epoch
323 .replace(next_epoch)
324 .expect("should have epoch");
325
326 if is_checkpoint {
327 let truncated_epoch = self
328 .truncated_epoch_rx
329 .recv()
330 .instrument_await("in_mem_recv_truncated_epoch")
331 .await
332 .ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
333 assert_eq!(truncated_epoch, prev_epoch);
334 }
335
336 Ok(LogWriterPostFlushCurrentEpoch::new(move || {
337 async move { Ok(()) }.boxed()
338 }))
339 }
340
341 fn pause(&mut self) -> LogStoreResult<()> {
342 Ok(())
344 }
345
346 fn resume(&mut self) -> LogStoreResult<()> {
347 Ok(())
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use std::future::poll_fn;
355 use std::task::Poll;
356
357 use futures::FutureExt;
358 use risingwave_common::array::{Op, StreamChunkBuilder};
359 use risingwave_common::types::{DataType, ScalarImpl};
360 use risingwave_common::util::epoch::{EpochPair, test_epoch};
361 use risingwave_connector::sink::log_store::{
362 LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
363 };
364
365 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
366 use crate::common::log_store_impl::kv_log_store::test_utils::LogWriterTestExt;
367
368 #[tokio::test]
369 async fn test_in_memory_log_store() {
370 let factory = BoundedInMemLogStoreFactory::for_test(4);
371 let (mut reader, mut writer) = factory.build().await;
372
373 let init_epoch = test_epoch(1);
374 let epoch1 = test_epoch(2);
375 let epoch2 = test_epoch(3);
376
377 let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
378 let mut builder =
379 StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
380 for (i, op) in ops.into_iter().enumerate() {
381 assert!(
382 builder
383 .append_row(
384 op,
385 [
386 Some(ScalarImpl::Int64(i as i64)),
387 Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
388 ]
389 )
390 .is_none()
391 );
392 }
393 let stream_chunk = builder.take().unwrap();
394 let stream_chunk_clone = stream_chunk.clone();
395
396 let mut join_handle = tokio::spawn(async move {
397 writer
398 .init(EpochPair::new_test_epoch(init_epoch), false)
399 .await
400 .unwrap();
401 writer
402 .write_chunk(stream_chunk_clone.clone())
403 .await
404 .unwrap();
405 writer
406 .write_chunk(stream_chunk_clone.clone())
407 .await
408 .unwrap();
409 writer
410 .flush_current_epoch_for_test(epoch1, false)
411 .await
412 .unwrap();
413 writer.write_chunk(stream_chunk_clone).await.unwrap();
414 writer
415 .flush_current_epoch_for_test(epoch2, true)
416 .await
417 .unwrap();
418 });
419
420 reader.init().await.unwrap();
421 let _chunk_id1_1 = match reader.next_item().await.unwrap() {
422 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
423 assert_eq!(epoch, init_epoch);
424 assert_eq!(&chunk, &stream_chunk);
425 chunk_id
426 }
427 _ => unreachable!(),
428 };
429
430 let chunk_id1_2 = match reader.next_item().await.unwrap() {
431 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
432 assert_eq!(epoch, init_epoch);
433 assert_eq!(&chunk, &stream_chunk);
434 chunk_id
435 }
436 _ => unreachable!(),
437 };
438
439 match reader.next_item().await.unwrap() {
440 (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
441 assert!(!is_checkpoint);
442 assert_eq!(epoch, init_epoch);
443 }
444 _ => unreachable!(),
445 }
446
447 let chunk_id2_1 = match reader.next_item().await.unwrap() {
448 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
449 assert_eq!(&chunk, &stream_chunk);
450 assert_eq!(epoch, epoch1);
451 chunk_id
452 }
453 _ => unreachable!(),
454 };
455
456 match reader.next_item().await.unwrap() {
457 (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
458 assert!(is_checkpoint);
459 assert_eq!(epoch, epoch1);
460 }
461 _ => unreachable!(),
462 }
463
464 reader
465 .truncate(TruncateOffset::Chunk {
466 epoch: init_epoch,
467 chunk_id: chunk_id1_2,
468 })
469 .unwrap();
470 assert!(
471 poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
472 .await
473 .is_pending()
474 );
475 reader
476 .truncate(TruncateOffset::Chunk {
477 epoch: epoch1,
478 chunk_id: chunk_id2_1,
479 })
480 .unwrap();
481 assert!(
482 poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
483 .await
484 .is_pending()
485 );
486 reader
487 .truncate(TruncateOffset::Barrier { epoch: epoch1 })
488 .unwrap();
489 join_handle.await.unwrap();
490 }
491}