1use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, pending, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::{TryFuture, TryFutureExt};
27use risingwave_common::array::StreamChunk;
28use risingwave_common::bail;
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
31use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
32use risingwave_common_estimate_size::EstimateSize;
33use risingwave_common_rate_limit::{RateLimit, RateLimiter};
34use risingwave_pb::stream_plan::PbSinkSchemaChange;
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Eq, Copy, Clone)]
42pub enum TruncateOffset {
43 Chunk { epoch: u64, chunk_id: ChunkId },
44 Barrier { epoch: u64 },
45}
46
47impl Ord for TruncateOffset {
48 fn cmp(&self, other: &Self) -> Ordering {
49 let extract = |offset: &TruncateOffset| match offset {
50 TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51 TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52 };
53 extract(self).cmp(&extract(other))
54 }
55}
56
57impl PartialOrd for TruncateOffset {
58 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59 Some(self.cmp(other))
60 }
61}
62
63impl TruncateOffset {
64 pub fn next_chunk_id(&self) -> ChunkId {
65 match self {
66 TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
67 TruncateOffset::Barrier { .. } => 0,
68 }
69 }
70
71 pub fn epoch(&self) -> u64 {
72 match self {
73 TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
74 }
75 }
76
77 pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
78 if *self >= next_offset {
79 bail!(
80 "next offset {:?} should be later than current offset {:?}",
81 next_offset,
82 self
83 )
84 } else {
85 Ok(())
86 }
87 }
88
89 pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
90 match self {
91 TruncateOffset::Chunk {
92 epoch: offset_epoch,
93 ..
94 } => {
95 if epoch != *offset_epoch {
96 bail!(
97 "new item epoch {} does not match current chunk offset epoch {}",
98 epoch,
99 offset_epoch
100 );
101 }
102 }
103 TruncateOffset::Barrier {
104 epoch: offset_epoch,
105 } => {
106 if epoch <= *offset_epoch {
107 bail!(
108 "new item epoch {} does not exceed barrier offset epoch {}",
109 epoch,
110 offset_epoch
111 );
112 }
113 }
114 }
115 Ok(())
116 }
117}
118
119#[derive(Debug)]
120pub enum LogStoreReadItem {
121 StreamChunk {
122 chunk: StreamChunk,
123 chunk_id: ChunkId,
124 },
125 Barrier {
126 is_checkpoint: bool,
127 new_vnode_bitmap: Option<Arc<Bitmap>>,
128 is_stop: bool,
129 schema_change: Option<PbSinkSchemaChange>,
130 },
131}
132
133pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
134
135#[must_use]
136pub struct LogWriterPostFlushCurrentEpoch<'a>(
137 Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
138);
139
140impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
141 pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
142 Self(Box::new(f))
143 }
144
145 pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
146 self.0().await
147 }
148}
149
150pub struct FlushCurrentEpochOptions {
151 pub is_checkpoint: bool,
152 pub new_vnode_bitmap: Option<Arc<Bitmap>>,
153 pub is_stop: bool,
154 pub schema_change: Option<PbSinkSchemaChange>,
155}
156
157pub trait LogWriter: Send {
158 fn init(
160 &mut self,
161 epoch: EpochPair,
162 pause_read_on_bootstrap: bool,
163 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
164
165 fn write_chunk(
167 &mut self,
168 chunk: StreamChunk,
169 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
170
171 fn flush_current_epoch(
173 &mut self,
174 next_epoch: u64,
175 options: FlushCurrentEpochOptions,
176 ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
177
178 fn pause(&mut self) -> LogStoreResult<()>;
179
180 fn resume(&mut self) -> LogStoreResult<()>;
181}
182
183pub trait LogReader: Send + Sized + 'static {
184 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
186
187 fn start_from(
189 &mut self,
190 start_offset: Option<u64>,
191 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
192
193 fn next_item(
197 &mut self,
198 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
199
200 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
203
204 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
208}
209
210pub trait LogStoreFactory: Send + 'static {
211 const ALLOW_REWIND: bool;
212 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
213 type Reader: LogReader;
214 type Writer: LogWriter;
215
216 fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
217}
218
219pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
220 f: F,
221 inner: R,
222}
223
224pub struct TruncateBarrierLogReader<R: LogReader> {
225 inner: R,
226 pending_barriers: VecDeque<u64>,
227}
228
229impl<R: LogReader> TruncateBarrierLogReader<R> {
230 pub fn new(inner: R) -> Self {
231 Self {
232 inner,
233 pending_barriers: VecDeque::new(),
234 }
235 }
236}
237
238impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
239 for TransformChunkLogReader<F, R>
240{
241 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
242 self.inner.init()
243 }
244
245 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
246 let (epoch, item) = self.inner.next_item().await?;
247 let item = match item {
248 LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
249 chunk: (self.f)(chunk),
250 chunk_id,
251 },
252 other => other,
253 };
254 Ok((epoch, item))
255 }
256
257 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
258 self.inner.truncate(offset)
259 }
260
261 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
262 self.inner.rewind()
263 }
264
265 fn start_from(
266 &mut self,
267 start_offset: Option<u64>,
268 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
269 self.inner.start_from(start_offset)
270 }
271}
272
273impl<R: LogReader> LogReader for TruncateBarrierLogReader<R> {
274 async fn init(&mut self) -> LogStoreResult<()> {
275 self.pending_barriers.clear();
276 self.inner.init().await
277 }
278
279 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
280 let (epoch, item) = self.inner.next_item().await?;
281 if matches!(item, LogStoreReadItem::Barrier { .. }) {
282 self.pending_barriers.push_back(epoch);
283 }
284 Ok((epoch, item))
285 }
286
287 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
288 while let Some(front_epoch) = self.pending_barriers.front().copied() {
289 match (TruncateOffset::Barrier { epoch: front_epoch }).cmp(&offset) {
290 Ordering::Less => {
291 self.inner
292 .truncate(TruncateOffset::Barrier { epoch: front_epoch })?;
293 self.pending_barriers.pop_front();
294 }
295 Ordering::Equal => {
296 self.inner.truncate(offset)?;
297 self.pending_barriers.pop_front();
298 return Ok(());
299 }
300 Ordering::Greater => {
301 self.inner.truncate(offset)?;
302 return Ok(());
303 }
304 }
305 }
306 self.inner.truncate(offset)?;
307 Ok(())
308 }
309
310 async fn rewind(&mut self) -> LogStoreResult<()> {
311 self.pending_barriers.clear();
312 self.inner.rewind().await
313 }
314
315 async fn start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
316 self.pending_barriers.clear();
317 self.inner.start_from(start_offset).await
318 }
319}
320
321pub struct BackpressureMonitoredLogReader<R: LogReader> {
322 inner: R,
323 wait_new_future_start_time: Option<Instant>,
325 wait_new_future_duration_ns: LabelGuardedIntCounter,
326}
327
328impl<R: LogReader> BackpressureMonitoredLogReader<R> {
329 fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
330 Self {
331 inner,
332 wait_new_future_start_time: None,
333 wait_new_future_duration_ns,
334 }
335 }
336}
337
338impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
339 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
340 self.wait_new_future_start_time = None;
341 self.inner.init()
342 }
343
344 fn next_item(
345 &mut self,
346 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
347 if let Some(start_time) = self.wait_new_future_start_time.take() {
348 self.wait_new_future_duration_ns
349 .inc_by(start_time.elapsed().as_nanos() as _);
350 }
351 self.inner.next_item().inspect_ok(|_| {
352 self.wait_new_future_start_time = Some(Instant::now());
354 })
355 }
356
357 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
358 self.inner.truncate(offset)
359 }
360
361 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
362 self.inner.rewind().inspect_ok(|_| {
363 self.wait_new_future_start_time = None;
364 })
365 }
366
367 fn start_from(
368 &mut self,
369 start_offset: Option<u64>,
370 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
371 self.inner.start_from(start_offset)
372 }
373}
374
375pub struct MonitoredLogReader<R: LogReader> {
376 inner: R,
377 read_epoch: u64,
378 metrics: LogReaderMetrics,
379}
380
381pub struct LogReaderMetrics {
382 pub log_store_latest_read_epoch: LabelGuardedIntGauge,
383 pub log_store_read_rows: LabelGuardedIntCounter,
384 pub log_store_read_bytes: LabelGuardedIntCounter,
385 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
386}
387
388impl<R: LogReader> MonitoredLogReader<R> {
389 pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
390 Self {
391 inner,
392 read_epoch: INVALID_EPOCH,
393 metrics,
394 }
395 }
396}
397
398impl<R: LogReader> LogReader for MonitoredLogReader<R> {
399 async fn init(&mut self) -> LogStoreResult<()> {
400 self.inner.init().instrument_await("log_reader_init").await
401 }
402
403 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
404 self.inner
405 .next_item()
406 .instrument_await("log_reader_next_item")
407 .await
408 .inspect(|(epoch, item)| {
409 if self.read_epoch != *epoch {
410 self.read_epoch = *epoch;
411 self.metrics.log_store_latest_read_epoch.set(*epoch as _);
412 }
413 if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
414 self.metrics
415 .log_store_read_rows
416 .inc_by(chunk.cardinality() as _);
417 self.metrics
418 .log_store_read_bytes
419 .inc_by(chunk.estimated_size() as u64);
420 }
421 })
422 }
423
424 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
425 self.inner.truncate(offset)
426 }
427
428 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
429 self.inner.rewind().instrument_await("log_reader_rewind")
430 }
431
432 fn start_from(
433 &mut self,
434 start_offset: Option<u64>,
435 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
436 self.inner.start_from(start_offset)
437 }
438}
439
440#[derive(Copy, Clone, PartialOrd, PartialEq, Debug)]
441struct UpstreamChunkOffset(TruncateOffset);
442#[derive(Copy, Clone, PartialOrd, PartialEq)]
443struct DownstreamChunkOffset(TruncateOffset);
444
445struct RateLimitedLogReaderCore<R: LogReader> {
446 inner: R,
447 consuming_chunk: Option<(
448 UpstreamChunkOffset,
449 VecDeque<DownstreamChunkOffset>,
451 Vec<StreamChunk>, )>,
453 consumed_offset_queue: VecDeque<(UpstreamChunkOffset, VecDeque<DownstreamChunkOffset>)>,
455 next_chunk_id: usize,
456 rate_limiter: RateLimiter,
457}
458
459pub struct RateLimitedLogReader<R: LogReader> {
460 core: RateLimitedLogReaderCore<R>,
461 control_rx: UnboundedReceiver<RateLimit>,
462}
463
464impl<R: LogReader> RateLimitedLogReader<R> {
465 pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
466 Self {
467 core: RateLimitedLogReaderCore {
468 inner,
469 consuming_chunk: None,
470 consumed_offset_queue: VecDeque::new(),
471 next_chunk_id: 0,
472 rate_limiter: RateLimiter::new(RateLimit::Disabled),
473 },
474 control_rx,
475 }
476 }
477}
478
479impl<R: LogReader> RateLimitedLogReaderCore<R> {
480 fn peek_next_pending_chunk(&self) -> Option<&StreamChunk> {
481 self.consuming_chunk
482 .as_ref()
483 .and_then(|(_, _, chunk)| chunk.last())
484 }
485
486 fn consume_next_pending_chunk(&mut self) -> Option<(u64, StreamChunk, ChunkId)> {
487 let Some((upstream_offset, consumed_offsets, pending_chunk)) = &mut self.consuming_chunk
488 else {
489 return None;
490 };
491 let epoch = upstream_offset.0.epoch();
492
493 let item = pending_chunk.pop().map(|chunk| {
494 let chunk_id = self.next_chunk_id;
495 self.next_chunk_id += 1;
496 consumed_offsets.push_front(DownstreamChunkOffset(TruncateOffset::Chunk {
497 epoch,
498 chunk_id,
499 }));
500 (epoch, chunk, chunk_id)
501 });
502 if pending_chunk.is_empty() {
503 let (upstream_offset, consumed_offsets, _) =
504 self.consuming_chunk.take().expect("checked some");
505 self.consumed_offset_queue
506 .push_front((upstream_offset, consumed_offsets));
507 }
508 item
509 }
510
511 fn consume_single_upstream_item(
512 &mut self,
513 epoch: u64,
514 mut item: LogStoreReadItem,
515 ) -> (u64, LogStoreReadItem) {
516 assert!(self.consuming_chunk.is_none());
517 let (upstream_offset, downstream_offset) = match &mut item {
518 LogStoreReadItem::StreamChunk { chunk_id, .. } => {
519 let upstream_chunk_id = *chunk_id;
520 let downstream_chunk_id = self.next_chunk_id;
521 self.next_chunk_id += 1;
522 *chunk_id = downstream_chunk_id;
523 (
524 UpstreamChunkOffset(TruncateOffset::Chunk {
525 epoch,
526 chunk_id: upstream_chunk_id,
527 }),
528 DownstreamChunkOffset(TruncateOffset::Chunk {
529 epoch,
530 chunk_id: downstream_chunk_id,
531 }),
532 )
533 }
534 LogStoreReadItem::Barrier { .. } => (
535 UpstreamChunkOffset(TruncateOffset::Barrier { epoch }),
536 DownstreamChunkOffset(TruncateOffset::Barrier { epoch }),
537 ),
538 };
539 self.consumed_offset_queue
540 .push_front((upstream_offset, VecDeque::from_iter([downstream_offset])));
541 (epoch, item)
542 }
543
544 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
545 match self.rate_limiter.rate_limit() {
546 RateLimit::Pause => pending().await,
547 RateLimit::Disabled => {
548 if let Some((epoch, chunk, chunk_id)) = self.consume_next_pending_chunk() {
549 Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
550 } else {
551 let (epoch, item) = self.inner.next_item().await?;
552 Ok(self.consume_single_upstream_item(epoch, item))
553 }
554 }
555 RateLimit::Fixed(limit) => {
556 if self.peek_next_pending_chunk().is_none() {
557 let (epoch, item) = self.inner.next_item().await?;
558 match item {
559 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
560 let chunks = if chunk.rate_limit_permits() < limit.get() {
561 vec![chunk]
562 } else {
563 let mut chunks = chunk.split(limit.get() as _);
564 chunks.reverse();
566 chunks
567 };
568 assert!(!chunks.is_empty());
569
570 assert!(
571 self.consuming_chunk
572 .replace((
573 UpstreamChunkOffset(TruncateOffset::Chunk {
574 epoch,
575 chunk_id
576 }),
577 VecDeque::new(),
578 chunks,
579 ))
580 .is_none()
581 );
582 }
583 item @ LogStoreReadItem::Barrier { .. } => {
584 return Ok(self.consume_single_upstream_item(epoch, item));
585 }
586 };
587 }
588 let chunk = self.peek_next_pending_chunk().expect("must Some");
589 self.rate_limiter.wait_chunk(chunk).await;
590 let (epoch, chunk, chunk_id) =
591 self.consume_next_pending_chunk().expect("must Some");
592 Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
593 }
594 }
595 }
596}
597
598impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
599 async fn init(&mut self) -> LogStoreResult<()> {
600 self.core.inner.init().await
601 }
602
603 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
604 loop {
605 select! {
606 biased;
607 recv = pin!(self.control_rx.recv()) => {
608 let new_rate_limit = match recv {
609 Some(limit) => limit,
610 None => bail!("rate limit control channel closed"),
611 };
612 let old_rate_limit = self.core.rate_limiter.update(new_rate_limit);
613 let paused = matches!(new_rate_limit, RateLimit::Pause);
614 tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
615 },
616 item = self.core.next_item() => {
617 return item;
618 }
619 }
620 }
621 }
622
623 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
624 let downstream_offset = DownstreamChunkOffset(offset);
625 let mut truncate_offset = None;
626 let mut stop = false;
627 'outer: while let Some((upstream_offset, downstream_offsets)) =
628 self.core.consumed_offset_queue.back_mut()
629 {
630 while let Some(prev_downstream_offset) = downstream_offsets.back() {
631 if *prev_downstream_offset <= downstream_offset {
632 downstream_offsets.pop_back();
633 } else {
634 stop = true;
635 break 'outer;
636 }
637 }
638 truncate_offset = Some(*upstream_offset);
639 self.core.consumed_offset_queue.pop_back();
640 }
641 if !stop && let Some((_, downstream_offsets, _)) = &mut self.core.consuming_chunk {
642 while let Some(prev_downstream_offset) = downstream_offsets.back() {
643 if *prev_downstream_offset <= downstream_offset {
644 downstream_offsets.pop_back();
645 } else {
646 break;
648 }
649 }
650 }
651 tracing::trace!(
652 "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
653 truncate_offset,
654 offset
655 );
656 if let Some(offset) = truncate_offset {
657 self.core.inner.truncate(offset.0)
658 } else {
659 Ok(())
660 }
661 }
662
663 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
664 self.core.consuming_chunk = None;
665 self.core.consumed_offset_queue.clear();
666 self.core.next_chunk_id = 0;
667 self.core.inner.rewind()
668 }
669
670 fn start_from(
671 &mut self,
672 start_offset: Option<u64>,
673 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
674 self.core.inner.start_from(start_offset)
675 }
676}
677
678#[easy_ext::ext(LogReaderExt)]
679impl<T> T
680where
681 T: LogReader,
682{
683 pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
684 self,
685 f: F,
686 ) -> TransformChunkLogReader<F, Self> {
687 TransformChunkLogReader { f, inner: self }
688 }
689
690 pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
691 let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
693 BackpressureMonitoredLogReader::new(
694 MonitoredLogReader::new(self, metrics),
695 wait_new_future_duration,
696 )
697 }
698
699 pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
700 RateLimitedLogReader::new(self, control_rx)
701 }
702}
703
704pub struct MonitoredLogWriter<W: LogWriter> {
705 inner: W,
706 metrics: LogWriterMetrics,
707}
708
709pub struct LogWriterMetrics {
710 pub log_store_first_write_epoch: LabelGuardedIntGauge,
712 pub log_store_latest_write_epoch: LabelGuardedIntGauge,
713 pub log_store_write_rows: LabelGuardedIntCounter,
714}
715
716impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
717 async fn init(
718 &mut self,
719 epoch: EpochPair,
720 pause_read_on_bootstrap: bool,
721 ) -> LogStoreResult<()> {
722 self.metrics
723 .log_store_first_write_epoch
724 .set(epoch.curr as _);
725 self.metrics
726 .log_store_latest_write_epoch
727 .set(epoch.curr as _);
728 self.inner.init(epoch, pause_read_on_bootstrap).await
729 }
730
731 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
732 self.metrics
733 .log_store_write_rows
734 .inc_by(chunk.cardinality() as _);
735 self.inner.write_chunk(chunk).await
736 }
737
738 async fn flush_current_epoch(
739 &mut self,
740 next_epoch: u64,
741 options: FlushCurrentEpochOptions,
742 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
743 let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
744 self.metrics
745 .log_store_latest_write_epoch
746 .set(next_epoch as _);
747 Ok(post_flush)
748 }
749
750 fn pause(&mut self) -> LogStoreResult<()> {
751 self.inner.pause()
752 }
753
754 fn resume(&mut self) -> LogStoreResult<()> {
755 self.inner.resume()
756 }
757}
758
759#[easy_ext::ext(LogWriterExt)]
760impl<T> T
761where
762 T: LogWriter + Sized,
763{
764 pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
765 MonitoredLogWriter {
766 inner: self,
767 metrics,
768 }
769 }
770}
771
772enum DeliveryFutureManagerItem<F> {
773 Chunk {
774 chunk_id: ChunkId,
775 futures: VecDeque<F>,
777 },
778 Barrier,
779}
780
781pub struct DeliveryFutureManager<F> {
782 future_count: usize,
783 max_future_count: usize,
784 items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
786}
787
788impl<F> DeliveryFutureManager<F> {
789 pub fn new(max_future_count: usize) -> Self {
790 Self {
791 future_count: 0,
792 max_future_count,
793 items: Default::default(),
794 }
795 }
796
797 pub fn add_barrier(&mut self, epoch: u64) {
798 if let Some((item_epoch, last_item)) = self.items.back() {
799 match last_item {
800 DeliveryFutureManagerItem::Chunk { .. } => {
801 assert_eq!(*item_epoch, epoch)
802 }
803 DeliveryFutureManagerItem::Barrier => {
804 assert!(
805 epoch > *item_epoch,
806 "new barrier epoch {} should be greater than prev barrier {}",
807 epoch,
808 item_epoch
809 );
810 }
811 }
812 }
813 self.items
814 .push_back((epoch, DeliveryFutureManagerItem::Barrier));
815 }
816
817 pub fn start_write_chunk(
818 &mut self,
819 epoch: u64,
820 chunk_id: ChunkId,
821 ) -> DeliveryFutureManagerAddFuture<'_, F> {
822 if let Some((item_epoch, item)) = self.items.back() {
823 match item {
824 DeliveryFutureManagerItem::Chunk {
825 chunk_id: item_chunk_id,
826 ..
827 } => {
828 assert_eq!(epoch, *item_epoch);
829 assert!(
830 chunk_id > *item_chunk_id,
831 "new chunk id {} should be greater than prev chunk id {}",
832 chunk_id,
833 item_chunk_id
834 );
835 }
836 DeliveryFutureManagerItem::Barrier => {
837 assert!(
838 epoch > *item_epoch,
839 "new chunk epoch {} should be greater than prev barrier: {}",
840 epoch,
841 item_epoch
842 );
843 }
844 }
845 }
846 self.items.push_back((
847 epoch,
848 DeliveryFutureManagerItem::Chunk {
849 chunk_id,
850 futures: VecDeque::new(),
851 },
852 ));
853 DeliveryFutureManagerAddFuture(self)
854 }
855}
856
857pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
858
859impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
860 pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
863 let mut has_await = false;
864 while self.0.future_count >= self.0.max_future_count {
865 self.await_one_delivery().await?;
866 has_await = true;
867 }
868 match self.0.items.back_mut() {
869 Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
870 futures.push_back(future);
871 self.0.future_count += 1;
872 Ok(has_await)
873 }
874 _ => unreachable!("should add future only after add a new chunk"),
875 }
876 }
877
878 pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
879 for (_, item) in &mut self.0.items {
880 if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
881 && let Some(mut delivery_future) = futures.pop_front()
882 {
883 self.0.future_count -= 1;
884 return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
885 } else {
886 continue;
887 }
888 }
889 Ok(())
890 }
891
892 pub fn future_count(&self) -> usize {
893 self.0.future_count
894 }
895
896 pub fn max_future_count(&self) -> usize {
897 self.0.max_future_count
898 }
899}
900
901impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
902 pub fn next_truncate_offset(
903 &mut self,
904 ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
905 poll_fn(move |cx| {
906 let mut latest_offset: Option<TruncateOffset> = None;
907 'outer: while let Some((epoch, item)) = self.items.front_mut() {
908 match item {
909 DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
910 while let Some(future) = futures.front_mut() {
911 match future.try_poll_unpin(cx) {
912 Poll::Ready(result) => match result {
913 Ok(()) => {
914 self.future_count -= 1;
915 futures.pop_front();
916 }
917 Err(e) => {
918 return Poll::Ready(Err(e));
919 }
920 },
921 Poll::Pending => {
922 break 'outer;
923 }
924 }
925 }
926
927 assert!(futures.is_empty());
930 latest_offset = Some(TruncateOffset::Chunk {
931 epoch: *epoch,
932 chunk_id: *chunk_id,
933 });
934 self.items.pop_front().expect("items not empty");
935 }
936 DeliveryFutureManagerItem::Barrier => {
937 latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
938 self.items.pop_front().expect("items not empty");
939 break 'outer;
941 }
942 }
943 }
944 if let Some(offset) = latest_offset {
945 Poll::Ready(Ok(offset))
946 } else {
947 Poll::Pending
948 }
949 })
950 }
951}
952
953#[cfg(test)]
954mod tests {
955 use std::collections::VecDeque;
956 use std::future::{Future, poll_fn};
957 use std::pin::pin;
958 use std::task::Poll;
959
960 use futures::{FutureExt, TryFuture};
961 use risingwave_common::array::StreamChunk;
962 use risingwave_common::util::epoch::test_epoch;
963 use tokio::sync::oneshot;
964 use tokio::sync::oneshot::Receiver;
965
966 use super::{LogReader, LogStoreReadItem, LogStoreResult, TruncateBarrierLogReader};
967 use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
968
969 struct MockLogReader {
970 items: VecDeque<(u64, LogStoreReadItem)>,
971 truncate_calls: Vec<TruncateOffset>,
972 }
973
974 impl MockLogReader {
975 fn new(items: impl IntoIterator<Item = (u64, LogStoreReadItem)>) -> Self {
976 Self {
977 items: items.into_iter().collect(),
978 truncate_calls: Vec::new(),
979 }
980 }
981 }
982
983 impl LogReader for MockLogReader {
984 async fn init(&mut self) -> LogStoreResult<()> {
985 Ok(())
986 }
987
988 async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
989 Ok(())
990 }
991
992 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
993 self.items
994 .pop_front()
995 .ok_or_else(|| anyhow::anyhow!("no item"))
996 }
997
998 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
999 self.truncate_calls.push(offset);
1000 Ok(())
1001 }
1002
1003 async fn rewind(&mut self) -> LogStoreResult<()> {
1004 Ok(())
1005 }
1006 }
1007
1008 #[test]
1009 fn test_truncate_offset_cmp() {
1010 assert!(
1011 TruncateOffset::Barrier { epoch: 232 }
1012 < TruncateOffset::Chunk {
1013 epoch: 233,
1014 chunk_id: 1
1015 }
1016 );
1017 assert_eq!(
1018 TruncateOffset::Chunk {
1019 epoch: 1,
1020 chunk_id: 1
1021 },
1022 TruncateOffset::Chunk {
1023 epoch: 1,
1024 chunk_id: 1
1025 }
1026 );
1027 assert!(
1028 TruncateOffset::Chunk {
1029 epoch: 1,
1030 chunk_id: 1
1031 } < TruncateOffset::Chunk {
1032 epoch: 1,
1033 chunk_id: 2
1034 }
1035 );
1036 assert!(
1037 TruncateOffset::Barrier { epoch: 1 }
1038 > TruncateOffset::Chunk {
1039 epoch: 1,
1040 chunk_id: 2
1041 }
1042 );
1043 assert!(
1044 TruncateOffset::Chunk {
1045 epoch: 1,
1046 chunk_id: 2
1047 } < TruncateOffset::Barrier { epoch: 1 }
1048 );
1049 assert!(
1050 TruncateOffset::Chunk {
1051 epoch: 2,
1052 chunk_id: 2
1053 } > TruncateOffset::Barrier { epoch: 1 }
1054 );
1055 assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
1056 }
1057
1058 type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
1059
1060 #[define_opaque(TestFuture)]
1061 fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
1062 async move { rx.await.unwrap() }.boxed()
1063 }
1064
1065 #[tokio::test]
1066 async fn test_empty() {
1067 let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
1068 let mut future = pin!(manager.next_truncate_offset());
1069 assert!(
1070 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1071 .await
1072 .is_pending()
1073 );
1074 }
1075
1076 #[tokio::test]
1077 async fn test_future_delivery_manager_basic() {
1078 let mut manager = DeliveryFutureManager::new(2);
1079 let epoch1 = 233;
1080 let chunk_id1 = 1;
1081 let (tx1_1, rx1_1) = oneshot::channel();
1082 let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
1083 assert!(
1084 !write_chunk
1085 .add_future_may_await(to_test_future(rx1_1))
1086 .await
1087 .unwrap()
1088 );
1089 assert_eq!(manager.future_count, 1);
1090 {
1091 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1092 assert!(
1093 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1094 .await
1095 .is_pending()
1096 );
1097 tx1_1.send(Ok(())).unwrap();
1098 assert_eq!(
1099 next_truncate_offset.await.unwrap(),
1100 TruncateOffset::Chunk {
1101 epoch: epoch1,
1102 chunk_id: chunk_id1
1103 }
1104 );
1105 }
1106 assert_eq!(manager.future_count, 0);
1107 manager.add_barrier(epoch1);
1108 assert_eq!(
1109 manager.next_truncate_offset().await.unwrap(),
1110 TruncateOffset::Barrier { epoch: epoch1 }
1111 );
1112 }
1113
1114 #[tokio::test]
1115 async fn test_future_delivery_manager_compress_chunk() {
1116 let mut manager = DeliveryFutureManager::new(10);
1117 let epoch1 = test_epoch(233);
1118 let chunk_id1 = 1;
1119 let chunk_id2 = chunk_id1 + 1;
1120 let chunk_id3 = chunk_id2 + 1;
1121 let (tx1_1, rx1_1) = oneshot::channel();
1122 let (tx1_2, rx1_2) = oneshot::channel();
1123 let (tx1_3, rx1_3) = oneshot::channel();
1124 let epoch2 = test_epoch(234);
1125 let (tx2_1, rx2_1) = oneshot::channel();
1126 assert!(
1127 !manager
1128 .start_write_chunk(epoch1, chunk_id1)
1129 .add_future_may_await(to_test_future(rx1_1))
1130 .await
1131 .unwrap()
1132 );
1133 assert!(
1134 !manager
1135 .start_write_chunk(epoch1, chunk_id2)
1136 .add_future_may_await(to_test_future(rx1_2))
1137 .await
1138 .unwrap()
1139 );
1140 assert!(
1141 !manager
1142 .start_write_chunk(epoch1, chunk_id3)
1143 .add_future_may_await(to_test_future(rx1_3))
1144 .await
1145 .unwrap()
1146 );
1147 manager.add_barrier(epoch1);
1148 assert!(
1149 !manager
1150 .start_write_chunk(epoch2, chunk_id1)
1151 .add_future_may_await(to_test_future(rx2_1))
1152 .await
1153 .unwrap()
1154 );
1155 assert_eq!(manager.future_count, 4);
1156 {
1157 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1158 assert!(
1159 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1160 .await
1161 .is_pending()
1162 );
1163 tx1_2.send(Ok(())).unwrap();
1164 assert!(
1165 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1166 .await
1167 .is_pending()
1168 );
1169 tx1_1.send(Ok(())).unwrap();
1170 assert_eq!(
1172 next_truncate_offset.await.unwrap(),
1173 TruncateOffset::Chunk {
1174 epoch: epoch1,
1175 chunk_id: chunk_id2
1176 }
1177 );
1178 }
1179 assert_eq!(manager.future_count, 2);
1180 {
1181 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1182 assert!(
1183 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1184 .await
1185 .is_pending()
1186 );
1187 tx1_3.send(Ok(())).unwrap();
1188 tx2_1.send(Ok(())).unwrap();
1189 assert_eq!(
1191 next_truncate_offset.await.unwrap(),
1192 TruncateOffset::Barrier { epoch: epoch1 }
1193 );
1194 }
1195 assert_eq!(manager.future_count, 1);
1196 assert_eq!(
1197 manager.next_truncate_offset().await.unwrap(),
1198 TruncateOffset::Chunk {
1199 epoch: epoch2,
1200 chunk_id: chunk_id1
1201 }
1202 );
1203 }
1204
1205 #[tokio::test]
1206 async fn test_future_delivery_manager_await_future() {
1207 let mut manager = DeliveryFutureManager::new(2);
1208 let epoch = 233;
1209 let chunk_id1 = 1;
1210 let chunk_id2 = chunk_id1 + 1;
1211 let (tx1_1, rx1_1) = oneshot::channel();
1212 let (tx1_2, rx1_2) = oneshot::channel();
1213 let (tx2_1, rx2_1) = oneshot::channel();
1214 let (tx2_2, rx2_2) = oneshot::channel();
1215
1216 {
1217 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1218 assert!(
1219 !write_chunk
1220 .add_future_may_await(to_test_future(rx1_1))
1221 .await
1222 .unwrap()
1223 );
1224 assert!(
1225 !write_chunk
1226 .add_future_may_await(to_test_future(rx1_2))
1227 .await
1228 .unwrap()
1229 );
1230 assert_eq!(manager.future_count, 2);
1231 }
1232
1233 {
1234 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1235 {
1236 let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1237 assert!(
1238 poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1239 .await
1240 .is_pending()
1241 );
1242 tx1_1.send(Ok(())).unwrap();
1243 assert!(future1.await.unwrap());
1244 }
1245 assert_eq!(2, write_chunk.future_count());
1246 {
1247 let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1248 assert!(
1249 poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1250 .await
1251 .is_pending()
1252 );
1253 tx1_2.send(Ok(())).unwrap();
1254 assert!(future2.await.unwrap());
1255 }
1256 assert_eq!(2, write_chunk.future_count());
1257 {
1258 let mut future3 = pin!(write_chunk.await_one_delivery());
1259 assert!(
1260 poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1261 .await
1262 .is_pending()
1263 );
1264 tx2_1.send(Ok(())).unwrap();
1265 future3.await.unwrap();
1266 }
1267 assert_eq!(1, write_chunk.future_count());
1268 }
1269
1270 assert_eq!(
1271 manager.next_truncate_offset().await.unwrap(),
1272 TruncateOffset::Chunk {
1273 epoch,
1274 chunk_id: chunk_id1
1275 }
1276 );
1277
1278 assert_eq!(1, manager.future_count);
1279
1280 {
1281 let mut future = pin!(manager.next_truncate_offset());
1282 assert!(
1283 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1284 .await
1285 .is_pending()
1286 );
1287 tx2_2.send(Ok(())).unwrap();
1288 assert_eq!(
1289 future.await.unwrap(),
1290 TruncateOffset::Chunk {
1291 epoch,
1292 chunk_id: chunk_id2
1293 }
1294 );
1295 }
1296
1297 assert_eq!(0, manager.future_count);
1298 }
1299
1300 #[tokio::test]
1301 async fn test_truncate_barrier_reader_truncates_previous_barriers_first() {
1302 let inner = MockLogReader::new([
1303 (
1304 1,
1305 LogStoreReadItem::Barrier {
1306 is_checkpoint: false,
1307 new_vnode_bitmap: None,
1308 is_stop: false,
1309 schema_change: None,
1310 },
1311 ),
1312 (
1313 2,
1314 LogStoreReadItem::Barrier {
1315 is_checkpoint: false,
1316 new_vnode_bitmap: None,
1317 is_stop: false,
1318 schema_change: None,
1319 },
1320 ),
1321 ]);
1322 let mut reader = TruncateBarrierLogReader::new(inner);
1323
1324 reader.init().await.unwrap();
1325 reader.start_from(None).await.unwrap();
1326 reader.next_item().await.unwrap();
1327 reader.next_item().await.unwrap();
1328 reader
1329 .truncate(TruncateOffset::Barrier { epoch: 2 })
1330 .unwrap();
1331
1332 assert_eq!(
1333 reader.inner.truncate_calls,
1334 vec![
1335 TruncateOffset::Barrier { epoch: 1 },
1336 TruncateOffset::Barrier { epoch: 2 },
1337 ]
1338 );
1339 }
1340
1341 #[tokio::test]
1342 async fn test_truncate_barrier_reader_keeps_later_barrier_pending() {
1343 let inner = MockLogReader::new([
1344 (
1345 1,
1346 LogStoreReadItem::Barrier {
1347 is_checkpoint: false,
1348 new_vnode_bitmap: None,
1349 is_stop: false,
1350 schema_change: None,
1351 },
1352 ),
1353 (
1354 2,
1355 LogStoreReadItem::StreamChunk {
1356 chunk: StreamChunk::default(),
1357 chunk_id: 0,
1358 },
1359 ),
1360 (
1361 2,
1362 LogStoreReadItem::Barrier {
1363 is_checkpoint: false,
1364 new_vnode_bitmap: None,
1365 is_stop: false,
1366 schema_change: None,
1367 },
1368 ),
1369 ]);
1370 let mut reader = TruncateBarrierLogReader::new(inner);
1371
1372 reader.init().await.unwrap();
1373 reader.start_from(None).await.unwrap();
1374 reader.next_item().await.unwrap();
1375 reader.next_item().await.unwrap();
1376 reader.next_item().await.unwrap();
1377 reader
1378 .truncate(TruncateOffset::Chunk {
1379 epoch: 2,
1380 chunk_id: 0,
1381 })
1382 .unwrap();
1383
1384 assert_eq!(
1385 reader.inner.truncate_calls,
1386 vec![
1387 TruncateOffset::Barrier { epoch: 1 },
1388 TruncateOffset::Chunk {
1389 epoch: 2,
1390 chunk_id: 0,
1391 },
1392 ]
1393 );
1394
1395 reader
1396 .truncate(TruncateOffset::Barrier { epoch: 2 })
1397 .unwrap();
1398 assert_eq!(
1399 reader.inner.truncate_calls,
1400 vec![
1401 TruncateOffset::Barrier { epoch: 1 },
1402 TruncateOffset::Chunk {
1403 epoch: 2,
1404 chunk_id: 0,
1405 },
1406 TruncateOffset::Barrier { epoch: 2 },
1407 ]
1408 );
1409 }
1410}