risingwave_stream/common/log_store_impl/
in_mem.rs1use anyhow::{Context, anyhow};
16use await_tree::InstrumentAwait;
17use futures::FutureExt;
18use risingwave_common::array::StreamChunk;
19use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
20use risingwave_connector::sink::log_store::{
21 FlushCurrentEpochOptions, LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult,
22 LogWriter, LogWriterPostFlushCurrentEpoch, TruncateOffset,
23};
24use tokio::sync::mpsc::{
25 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
26};
27use tokio::sync::oneshot;
28
29use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
30
31enum InMemLogStoreItem {
32 StreamChunk(StreamChunk),
33 Barrier {
34 next_epoch: u64,
35 options: FlushCurrentEpochOptions,
36 },
37}
38
39pub struct BoundedInMemLogStoreWriter {
45 curr_epoch: Option<u64>,
47
48 init_epoch_tx: Option<oneshot::Sender<u64>>,
50
51 item_tx: Sender<InMemLogStoreItem>,
53
54 truncated_epoch_rx: UnboundedReceiver<u64>,
56}
57
58#[derive(Eq, PartialEq, Debug)]
59enum LogReaderEpochProgress {
60 Consuming(u64),
62 AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
64}
65
66const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
67
68pub struct BoundedInMemLogStoreReader {
69 epoch_progress: LogReaderEpochProgress,
72
73 init_epoch_rx: Option<oneshot::Receiver<u64>>,
75
76 item_rx: Receiver<InMemLogStoreItem>,
78
79 truncated_epoch_tx: UnboundedSender<u64>,
81
82 latest_offset: TruncateOffset,
84
85 truncate_offset: TruncateOffset,
87}
88
89pub struct BoundedInMemLogStoreFactory {
90 bound: usize,
91}
92
93impl BoundedInMemLogStoreFactory {
94 pub fn new(bound: usize) -> Self {
95 Self { bound }
96 }
97}
98
99impl LogStoreFactory for BoundedInMemLogStoreFactory {
100 type Reader = BoundedInMemLogStoreReader;
101 type Writer = BoundedInMemLogStoreWriter;
102
103 const ALLOW_REWIND: bool = false;
104 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool = false;
105
106 async fn build(self) -> (Self::Reader, Self::Writer) {
107 let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
108 let (item_tx, item_rx) = channel(self.bound);
109 let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
110 let reader = BoundedInMemLogStoreReader {
111 epoch_progress: UNINITIALIZED,
112 init_epoch_rx: Some(init_epoch_rx),
113 item_rx,
114 truncated_epoch_tx,
115 latest_offset: TruncateOffset::Barrier { epoch: 0 },
116 truncate_offset: TruncateOffset::Barrier { epoch: 0 },
117 };
118 let writer = BoundedInMemLogStoreWriter {
119 curr_epoch: None,
120 init_epoch_tx: Some(init_epoch_tx),
121 item_tx,
122 truncated_epoch_rx,
123 };
124 (reader, writer)
125 }
126}
127
128impl LogReader for BoundedInMemLogStoreReader {
129 async fn init(&mut self) -> LogStoreResult<()> {
130 let init_epoch_rx = self
131 .init_epoch_rx
132 .take()
133 .expect("should not init for twice");
134 let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
135 assert_eq!(self.epoch_progress, UNINITIALIZED);
136 self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
137 self.latest_offset = TruncateOffset::Barrier {
138 epoch: epoch.prev_epoch(),
139 };
140 self.truncate_offset = TruncateOffset::Barrier {
141 epoch: epoch.prev_epoch(),
142 };
143 Ok(())
144 }
145
146 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
147 match self.item_rx.recv().await {
148 Some(item) => match self.epoch_progress {
149 Consuming(current_epoch) => match item {
150 InMemLogStoreItem::StreamChunk(chunk) => {
151 let chunk_id = match self.latest_offset {
152 TruncateOffset::Chunk { epoch, chunk_id } => {
153 assert_eq!(epoch, current_epoch);
154 chunk_id + 1
155 }
156 TruncateOffset::Barrier { epoch } => {
157 assert!(
158 epoch < current_epoch,
159 "prev offset at barrier {} but current epoch {}",
160 epoch,
161 current_epoch
162 );
163 0
164 }
165 };
166 self.latest_offset = TruncateOffset::Chunk {
167 epoch: current_epoch,
168 chunk_id,
169 };
170 Ok((
171 current_epoch,
172 LogStoreReadItem::StreamChunk { chunk, chunk_id },
173 ))
174 }
175 InMemLogStoreItem::Barrier {
176 next_epoch,
177 options,
178 } => {
179 if options.is_checkpoint {
180 self.epoch_progress = AwaitingTruncate {
181 next_epoch,
182 sealed_epoch: current_epoch,
183 };
184 } else {
185 self.epoch_progress = Consuming(next_epoch);
186 }
187 self.latest_offset = TruncateOffset::Barrier {
188 epoch: current_epoch,
189 };
190 Ok((
191 current_epoch,
192 LogStoreReadItem::Barrier {
193 is_checkpoint: options.is_checkpoint,
194 new_vnode_bitmap: options.new_vnode_bitmap,
195 is_stop: options.is_stop,
196 },
197 ))
198 }
199 },
200 AwaitingTruncate { .. } => Err(anyhow!(
201 "should not call next_item on checkpoint barrier for in-mem log store"
202 )),
203 },
204 None => Err(anyhow!("end of log stream")),
205 }
206 }
207
208 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
209 if self.truncate_offset >= offset {
211 return Err(anyhow!(
212 "truncate offset {:?} but prev truncate offset is {:?}",
213 offset,
214 self.truncate_offset
215 ));
216 }
217
218 if offset > self.latest_offset {
220 return Err(anyhow!(
221 "truncate at {:?} but latest offset is {:?}",
222 offset,
223 self.latest_offset
224 ));
225 }
226
227 if let AwaitingTruncate {
228 sealed_epoch,
229 next_epoch,
230 } = &self.epoch_progress
231 {
232 if let TruncateOffset::Barrier { epoch } = offset
233 && epoch == *sealed_epoch
234 {
235 let sealed_epoch = *sealed_epoch;
236 self.epoch_progress = Consuming(*next_epoch);
237 self.truncated_epoch_tx
238 .send(sealed_epoch)
239 .map_err(|_| anyhow!("unable to send sealed epoch"))?;
240 }
241 }
242 self.truncate_offset = offset;
243 Ok(())
244 }
245
246 async fn rewind(&mut self) -> LogStoreResult<()> {
247 Err(anyhow!("should not call rewind on it"))
248 }
249
250 async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
251 Ok(())
252 }
253}
254
255impl LogWriter for BoundedInMemLogStoreWriter {
256 async fn init(
257 &mut self,
258 epoch: EpochPair,
259 _pause_read_on_bootstrap: bool,
260 ) -> LogStoreResult<()> {
261 let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
262 init_epoch_tx
263 .send(epoch.curr)
264 .map_err(|_| anyhow!("unable to send init epoch"))?;
265 self.curr_epoch = Some(epoch.curr);
266 Ok(())
267 }
268
269 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
270 self.item_tx
271 .send(InMemLogStoreItem::StreamChunk(chunk))
272 .instrument_await("in_mem_send_item_chunk")
273 .await
274 .map_err(|_| anyhow!("unable to send stream chunk"))?;
275 Ok(())
276 }
277
278 async fn flush_current_epoch(
279 &mut self,
280 next_epoch: u64,
281 options: FlushCurrentEpochOptions,
282 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
283 let is_checkpoint = options.is_checkpoint;
284 self.item_tx
285 .send(InMemLogStoreItem::Barrier {
286 next_epoch,
287 options,
288 })
289 .instrument_await("in_mem_send_item_barrier")
290 .await
291 .map_err(|_| anyhow!("unable to send barrier"))?;
292
293 let prev_epoch = self
294 .curr_epoch
295 .replace(next_epoch)
296 .expect("should have epoch");
297
298 if is_checkpoint {
299 let truncated_epoch = self
300 .truncated_epoch_rx
301 .recv()
302 .instrument_await("in_mem_recv_truncated_epoch")
303 .await
304 .ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
305 assert_eq!(truncated_epoch, prev_epoch);
306 }
307
308 Ok(LogWriterPostFlushCurrentEpoch::new(move || {
309 async move { Ok(()) }.boxed()
310 }))
311 }
312
313 fn pause(&mut self) -> LogStoreResult<()> {
314 Ok(())
316 }
317
318 fn resume(&mut self) -> LogStoreResult<()> {
319 Ok(())
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::future::poll_fn;
327 use std::task::Poll;
328
329 use futures::FutureExt;
330 use risingwave_common::array::{Op, StreamChunkBuilder};
331 use risingwave_common::types::{DataType, ScalarImpl};
332 use risingwave_common::util::epoch::{EpochPair, test_epoch};
333 use risingwave_connector::sink::log_store::{
334 LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
335 };
336
337 use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
338 use crate::common::log_store_impl::kv_log_store::test_utils::LogWriterTestExt;
339
340 #[tokio::test]
341 async fn test_in_memory_log_store() {
342 let factory = BoundedInMemLogStoreFactory::new(4);
343 let (mut reader, mut writer) = factory.build().await;
344
345 let init_epoch = test_epoch(1);
346 let epoch1 = test_epoch(2);
347 let epoch2 = test_epoch(3);
348
349 let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
350 let mut builder =
351 StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
352 for (i, op) in ops.into_iter().enumerate() {
353 assert!(
354 builder
355 .append_row(
356 op,
357 [
358 Some(ScalarImpl::Int64(i as i64)),
359 Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
360 ]
361 )
362 .is_none()
363 );
364 }
365 let stream_chunk = builder.take().unwrap();
366 let stream_chunk_clone = stream_chunk.clone();
367
368 let mut join_handle = tokio::spawn(async move {
369 writer
370 .init(EpochPair::new_test_epoch(init_epoch), false)
371 .await
372 .unwrap();
373 writer
374 .write_chunk(stream_chunk_clone.clone())
375 .await
376 .unwrap();
377 writer
378 .write_chunk(stream_chunk_clone.clone())
379 .await
380 .unwrap();
381 writer
382 .flush_current_epoch_for_test(epoch1, false)
383 .await
384 .unwrap();
385 writer.write_chunk(stream_chunk_clone).await.unwrap();
386 writer
387 .flush_current_epoch_for_test(epoch2, true)
388 .await
389 .unwrap();
390 });
391
392 reader.init().await.unwrap();
393 let _chunk_id1_1 = match reader.next_item().await.unwrap() {
394 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
395 assert_eq!(epoch, init_epoch);
396 assert_eq!(&chunk, &stream_chunk);
397 chunk_id
398 }
399 _ => unreachable!(),
400 };
401
402 let chunk_id1_2 = match reader.next_item().await.unwrap() {
403 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
404 assert_eq!(epoch, init_epoch);
405 assert_eq!(&chunk, &stream_chunk);
406 chunk_id
407 }
408 _ => unreachable!(),
409 };
410
411 match reader.next_item().await.unwrap() {
412 (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
413 assert!(!is_checkpoint);
414 assert_eq!(epoch, init_epoch);
415 }
416 _ => unreachable!(),
417 }
418
419 let chunk_id2_1 = match reader.next_item().await.unwrap() {
420 (epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
421 assert_eq!(&chunk, &stream_chunk);
422 assert_eq!(epoch, epoch1);
423 chunk_id
424 }
425 _ => unreachable!(),
426 };
427
428 match reader.next_item().await.unwrap() {
429 (epoch, LogStoreReadItem::Barrier { is_checkpoint, .. }) => {
430 assert!(is_checkpoint);
431 assert_eq!(epoch, epoch1);
432 }
433 _ => unreachable!(),
434 }
435
436 reader
437 .truncate(TruncateOffset::Chunk {
438 epoch: init_epoch,
439 chunk_id: chunk_id1_2,
440 })
441 .unwrap();
442 assert!(
443 poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
444 .await
445 .is_pending()
446 );
447 reader
448 .truncate(TruncateOffset::Chunk {
449 epoch: epoch1,
450 chunk_id: chunk_id2_1,
451 })
452 .unwrap();
453 assert!(
454 poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
455 .await
456 .is_pending()
457 );
458 reader
459 .truncate(TruncateOffset::Barrier { epoch: epoch1 })
460 .unwrap();
461 join_handle.await.unwrap();
462 }
463}