1use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, pending, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::{TryFuture, TryFutureExt};
27use risingwave_common::array::StreamChunk;
28use risingwave_common::bail;
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::Field;
31use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_common_rate_limit::{RateLimit, RateLimiter};
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub enum TruncateOffset {
43 Chunk { epoch: u64, chunk_id: ChunkId },
44 Barrier { epoch: u64 },
45}
46
47impl PartialOrd for TruncateOffset {
48 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49 let extract = |offset: &TruncateOffset| match offset {
50 TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51 TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52 };
53 let this = extract(self);
54 let other = extract(other);
55 this.partial_cmp(&other)
56 }
57}
58
59impl TruncateOffset {
60 pub fn next_chunk_id(&self) -> ChunkId {
61 match self {
62 TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
63 TruncateOffset::Barrier { .. } => 0,
64 }
65 }
66
67 pub fn epoch(&self) -> u64 {
68 match self {
69 TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
70 }
71 }
72
73 pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
74 if *self >= next_offset {
75 bail!(
76 "next offset {:?} should be later than current offset {:?}",
77 next_offset,
78 self
79 )
80 } else {
81 Ok(())
82 }
83 }
84
85 pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
86 match self {
87 TruncateOffset::Chunk {
88 epoch: offset_epoch,
89 ..
90 } => {
91 if epoch != *offset_epoch {
92 bail!(
93 "new item epoch {} does not match current chunk offset epoch {}",
94 epoch,
95 offset_epoch
96 );
97 }
98 }
99 TruncateOffset::Barrier {
100 epoch: offset_epoch,
101 } => {
102 if epoch <= *offset_epoch {
103 bail!(
104 "new item epoch {} does not exceed barrier offset epoch {}",
105 epoch,
106 offset_epoch
107 );
108 }
109 }
110 }
111 Ok(())
112 }
113}
114
115#[derive(Debug)]
116pub enum LogStoreReadItem {
117 StreamChunk {
118 chunk: StreamChunk,
119 chunk_id: ChunkId,
120 },
121 Barrier {
122 is_checkpoint: bool,
123 new_vnode_bitmap: Option<Arc<Bitmap>>,
124 is_stop: bool,
125 add_columns: Option<Vec<Field>>,
126 },
127}
128
129pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
130
131#[must_use]
132pub struct LogWriterPostFlushCurrentEpoch<'a>(
133 Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
134);
135
136impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
137 pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
138 Self(Box::new(f))
139 }
140
141 pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
142 self.0().await
143 }
144}
145
146pub struct FlushCurrentEpochOptions {
147 pub is_checkpoint: bool,
148 pub new_vnode_bitmap: Option<Arc<Bitmap>>,
149 pub is_stop: bool,
150 pub add_columns: Option<Vec<Field>>,
151}
152
153pub trait LogWriter: Send {
154 fn init(
156 &mut self,
157 epoch: EpochPair,
158 pause_read_on_bootstrap: bool,
159 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
160
161 fn write_chunk(
163 &mut self,
164 chunk: StreamChunk,
165 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
166
167 fn flush_current_epoch(
169 &mut self,
170 next_epoch: u64,
171 options: FlushCurrentEpochOptions,
172 ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
173
174 fn pause(&mut self) -> LogStoreResult<()>;
175
176 fn resume(&mut self) -> LogStoreResult<()>;
177}
178
179pub trait LogReader: Send + Sized + 'static {
180 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
182
183 fn start_from(
185 &mut self,
186 start_offset: Option<u64>,
187 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
188
189 fn next_item(
193 &mut self,
194 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
195
196 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
199
200 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
204}
205
206pub trait LogStoreFactory: Send + 'static {
207 const ALLOW_REWIND: bool;
208 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
209 type Reader: LogReader;
210 type Writer: LogWriter;
211
212 fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
213}
214
215pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
216 f: F,
217 inner: R,
218}
219
220impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
221 for TransformChunkLogReader<F, R>
222{
223 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
224 self.inner.init()
225 }
226
227 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
228 let (epoch, item) = self.inner.next_item().await?;
229 let item = match item {
230 LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
231 chunk: (self.f)(chunk),
232 chunk_id,
233 },
234 other => other,
235 };
236 Ok((epoch, item))
237 }
238
239 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
240 self.inner.truncate(offset)
241 }
242
243 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
244 self.inner.rewind()
245 }
246
247 fn start_from(
248 &mut self,
249 start_offset: Option<u64>,
250 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
251 self.inner.start_from(start_offset)
252 }
253}
254
255pub struct BackpressureMonitoredLogReader<R: LogReader> {
256 inner: R,
257 wait_new_future_start_time: Option<Instant>,
259 wait_new_future_duration_ns: LabelGuardedIntCounter,
260}
261
262impl<R: LogReader> BackpressureMonitoredLogReader<R> {
263 fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
264 Self {
265 inner,
266 wait_new_future_start_time: None,
267 wait_new_future_duration_ns,
268 }
269 }
270}
271
272impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
273 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
274 self.wait_new_future_start_time = None;
275 self.inner.init()
276 }
277
278 fn next_item(
279 &mut self,
280 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
281 if let Some(start_time) = self.wait_new_future_start_time.take() {
282 self.wait_new_future_duration_ns
283 .inc_by(start_time.elapsed().as_nanos() as _);
284 }
285 self.inner.next_item().inspect_ok(|_| {
286 self.wait_new_future_start_time = Some(Instant::now());
288 })
289 }
290
291 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
292 self.inner.truncate(offset)
293 }
294
295 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
296 self.inner.rewind().inspect_ok(|_| {
297 self.wait_new_future_start_time = None;
298 })
299 }
300
301 fn start_from(
302 &mut self,
303 start_offset: Option<u64>,
304 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
305 self.inner.start_from(start_offset)
306 }
307}
308
309pub struct MonitoredLogReader<R: LogReader> {
310 inner: R,
311 read_epoch: u64,
312 metrics: LogReaderMetrics,
313}
314
315pub struct LogReaderMetrics {
316 pub log_store_latest_read_epoch: LabelGuardedIntGauge,
317 pub log_store_read_rows: LabelGuardedIntCounter,
318 pub log_store_read_bytes: LabelGuardedIntCounter,
319 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
320}
321
322impl<R: LogReader> MonitoredLogReader<R> {
323 pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
324 Self {
325 inner,
326 read_epoch: INVALID_EPOCH,
327 metrics,
328 }
329 }
330}
331
332impl<R: LogReader> LogReader for MonitoredLogReader<R> {
333 async fn init(&mut self) -> LogStoreResult<()> {
334 self.inner.init().instrument_await("log_reader_init").await
335 }
336
337 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
338 self.inner
339 .next_item()
340 .instrument_await("log_reader_next_item")
341 .await
342 .inspect(|(epoch, item)| {
343 if self.read_epoch != *epoch {
344 self.read_epoch = *epoch;
345 self.metrics.log_store_latest_read_epoch.set(*epoch as _);
346 }
347 if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
348 self.metrics
349 .log_store_read_rows
350 .inc_by(chunk.cardinality() as _);
351 self.metrics
352 .log_store_read_bytes
353 .inc_by(chunk.estimated_size() as u64);
354 }
355 })
356 }
357
358 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
359 self.inner.truncate(offset)
360 }
361
362 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
363 self.inner.rewind().instrument_await("log_reader_rewind")
364 }
365
366 fn start_from(
367 &mut self,
368 start_offset: Option<u64>,
369 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
370 self.inner.start_from(start_offset)
371 }
372}
373
374#[derive(Copy, Clone, PartialOrd, PartialEq, Debug)]
375struct UpstreamChunkOffset(TruncateOffset);
376#[derive(Copy, Clone, PartialOrd, PartialEq)]
377struct DownstreamChunkOffset(TruncateOffset);
378
379struct RateLimitedLogReaderCore<R: LogReader> {
380 inner: R,
381 consuming_chunk: Option<(
382 UpstreamChunkOffset,
383 VecDeque<DownstreamChunkOffset>,
385 Vec<StreamChunk>, )>,
387 consumed_offset_queue: VecDeque<(UpstreamChunkOffset, VecDeque<DownstreamChunkOffset>)>,
389 next_chunk_id: usize,
390 rate_limiter: RateLimiter,
391}
392
393pub struct RateLimitedLogReader<R: LogReader> {
394 core: RateLimitedLogReaderCore<R>,
395 control_rx: UnboundedReceiver<RateLimit>,
396}
397
398impl<R: LogReader> RateLimitedLogReader<R> {
399 pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
400 Self {
401 core: RateLimitedLogReaderCore {
402 inner,
403 consuming_chunk: None,
404 consumed_offset_queue: VecDeque::new(),
405 next_chunk_id: 0,
406 rate_limiter: RateLimiter::new(RateLimit::Disabled),
407 },
408 control_rx,
409 }
410 }
411}
412
413impl<R: LogReader> RateLimitedLogReaderCore<R> {
414 fn peek_next_pending_chunk(&self) -> Option<&StreamChunk> {
415 self.consuming_chunk
416 .as_ref()
417 .and_then(|(_, _, chunk)| chunk.last())
418 }
419
420 fn consume_next_pending_chunk(&mut self) -> Option<(u64, StreamChunk, ChunkId)> {
421 let Some((upstream_offset, consumed_offsets, pending_chunk)) = &mut self.consuming_chunk
422 else {
423 return None;
424 };
425 let epoch = upstream_offset.0.epoch();
426
427 let item = pending_chunk.pop().map(|chunk| {
428 let chunk_id = self.next_chunk_id;
429 self.next_chunk_id += 1;
430 consumed_offsets.push_front(DownstreamChunkOffset(TruncateOffset::Chunk {
431 epoch,
432 chunk_id,
433 }));
434 (epoch, chunk, chunk_id)
435 });
436 if pending_chunk.is_empty() {
437 let (upstream_offset, consumed_offsets, _) =
438 self.consuming_chunk.take().expect("checked some");
439 self.consumed_offset_queue
440 .push_front((upstream_offset, consumed_offsets));
441 }
442 item
443 }
444
445 fn consume_single_upstream_item(
446 &mut self,
447 epoch: u64,
448 mut item: LogStoreReadItem,
449 ) -> (u64, LogStoreReadItem) {
450 assert!(self.consuming_chunk.is_none());
451 let (upstream_offset, downstream_offset) = match &mut item {
452 LogStoreReadItem::StreamChunk { chunk_id, .. } => {
453 let upstream_chunk_id = *chunk_id;
454 let downstream_chunk_id = self.next_chunk_id;
455 self.next_chunk_id += 1;
456 *chunk_id = downstream_chunk_id;
457 (
458 UpstreamChunkOffset(TruncateOffset::Chunk {
459 epoch,
460 chunk_id: upstream_chunk_id,
461 }),
462 DownstreamChunkOffset(TruncateOffset::Chunk {
463 epoch,
464 chunk_id: downstream_chunk_id,
465 }),
466 )
467 }
468 LogStoreReadItem::Barrier { .. } => (
469 UpstreamChunkOffset(TruncateOffset::Barrier { epoch }),
470 DownstreamChunkOffset(TruncateOffset::Barrier { epoch }),
471 ),
472 };
473 self.consumed_offset_queue
474 .push_front((upstream_offset, VecDeque::from_iter([downstream_offset])));
475 (epoch, item)
476 }
477
478 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
479 match self.rate_limiter.rate_limit() {
480 RateLimit::Pause => pending().await,
481 RateLimit::Disabled => {
482 if let Some((epoch, chunk, chunk_id)) = self.consume_next_pending_chunk() {
483 Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
484 } else {
485 let (epoch, item) = self.inner.next_item().await?;
486 Ok(self.consume_single_upstream_item(epoch, item))
487 }
488 }
489 RateLimit::Fixed(limit) => {
490 if self.peek_next_pending_chunk().is_none() {
491 let (epoch, item) = self.inner.next_item().await?;
492 match item {
493 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
494 let chunks = if chunk.rate_limit_permits() < limit.get() {
495 vec![chunk]
496 } else {
497 let mut chunks = chunk.split(limit.get() as _);
498 chunks.reverse();
500 chunks
501 };
502 assert!(!chunks.is_empty());
503
504 assert!(
505 self.consuming_chunk
506 .replace((
507 UpstreamChunkOffset(TruncateOffset::Chunk {
508 epoch,
509 chunk_id
510 }),
511 VecDeque::new(),
512 chunks,
513 ))
514 .is_none()
515 );
516 }
517 item @ LogStoreReadItem::Barrier { .. } => {
518 return Ok(self.consume_single_upstream_item(epoch, item));
519 }
520 };
521 }
522 let chunk = self.peek_next_pending_chunk().expect("must Some");
523 self.rate_limiter.wait_chunk(chunk).await;
524 let (epoch, chunk, chunk_id) =
525 self.consume_next_pending_chunk().expect("must Some");
526 Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
527 }
528 }
529 }
530}
531
532impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
533 async fn init(&mut self) -> LogStoreResult<()> {
534 self.core.inner.init().await
535 }
536
537 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
538 loop {
539 select! {
540 biased;
541 recv = pin!(self.control_rx.recv()) => {
542 let new_rate_limit = match recv {
543 Some(limit) => limit,
544 None => bail!("rate limit control channel closed"),
545 };
546 let old_rate_limit = self.core.rate_limiter.update(new_rate_limit);
547 let paused = matches!(new_rate_limit, RateLimit::Pause);
548 tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
549 },
550 item = self.core.next_item() => {
551 return item;
552 }
553 }
554 }
555 }
556
557 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
558 let downstream_offset = DownstreamChunkOffset(offset);
559 let mut truncate_offset = None;
560 let mut stop = false;
561 'outer: while let Some((upstream_offset, downstream_offsets)) =
562 self.core.consumed_offset_queue.back_mut()
563 {
564 while let Some(prev_downstream_offset) = downstream_offsets.back() {
565 if *prev_downstream_offset <= downstream_offset {
566 downstream_offsets.pop_back();
567 } else {
568 stop = true;
569 break 'outer;
570 }
571 }
572 truncate_offset = Some(*upstream_offset);
573 self.core.consumed_offset_queue.pop_back();
574 }
575 if !stop && let Some((_, downstream_offsets, _)) = &mut self.core.consuming_chunk {
576 while let Some(prev_downstream_offset) = downstream_offsets.back() {
577 if *prev_downstream_offset <= downstream_offset {
578 downstream_offsets.pop_back();
579 } else {
580 break;
582 }
583 }
584 }
585 tracing::trace!(
586 "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
587 truncate_offset,
588 offset
589 );
590 if let Some(offset) = truncate_offset {
591 self.core.inner.truncate(offset.0)
592 } else {
593 Ok(())
594 }
595 }
596
597 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
598 self.core.consuming_chunk = None;
599 self.core.consumed_offset_queue.clear();
600 self.core.next_chunk_id = 0;
601 self.core.inner.rewind()
602 }
603
604 fn start_from(
605 &mut self,
606 start_offset: Option<u64>,
607 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
608 self.core.inner.start_from(start_offset)
609 }
610}
611
612#[easy_ext::ext(LogReaderExt)]
613impl<T> T
614where
615 T: LogReader,
616{
617 pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
618 self,
619 f: F,
620 ) -> TransformChunkLogReader<F, Self> {
621 TransformChunkLogReader { f, inner: self }
622 }
623
624 pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
625 let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
627 BackpressureMonitoredLogReader::new(
628 MonitoredLogReader::new(self, metrics),
629 wait_new_future_duration,
630 )
631 }
632
633 pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
634 RateLimitedLogReader::new(self, control_rx)
635 }
636}
637
638pub struct MonitoredLogWriter<W: LogWriter> {
639 inner: W,
640 metrics: LogWriterMetrics,
641}
642
643pub struct LogWriterMetrics {
644 pub log_store_first_write_epoch: LabelGuardedIntGauge,
646 pub log_store_latest_write_epoch: LabelGuardedIntGauge,
647 pub log_store_write_rows: LabelGuardedIntCounter,
648}
649
650impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
651 async fn init(
652 &mut self,
653 epoch: EpochPair,
654 pause_read_on_bootstrap: bool,
655 ) -> LogStoreResult<()> {
656 self.metrics
657 .log_store_first_write_epoch
658 .set(epoch.curr as _);
659 self.metrics
660 .log_store_latest_write_epoch
661 .set(epoch.curr as _);
662 self.inner.init(epoch, pause_read_on_bootstrap).await
663 }
664
665 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
666 self.metrics
667 .log_store_write_rows
668 .inc_by(chunk.cardinality() as _);
669 self.inner.write_chunk(chunk).await
670 }
671
672 async fn flush_current_epoch(
673 &mut self,
674 next_epoch: u64,
675 options: FlushCurrentEpochOptions,
676 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
677 let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
678 self.metrics
679 .log_store_latest_write_epoch
680 .set(next_epoch as _);
681 Ok(post_flush)
682 }
683
684 fn pause(&mut self) -> LogStoreResult<()> {
685 self.inner.pause()
686 }
687
688 fn resume(&mut self) -> LogStoreResult<()> {
689 self.inner.resume()
690 }
691}
692
693#[easy_ext::ext(LogWriterExt)]
694impl<T> T
695where
696 T: LogWriter + Sized,
697{
698 pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
699 MonitoredLogWriter {
700 inner: self,
701 metrics,
702 }
703 }
704}
705
706enum DeliveryFutureManagerItem<F> {
707 Chunk {
708 chunk_id: ChunkId,
709 futures: VecDeque<F>,
711 },
712 Barrier,
713}
714
715pub struct DeliveryFutureManager<F> {
716 future_count: usize,
717 max_future_count: usize,
718 items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
720}
721
722impl<F> DeliveryFutureManager<F> {
723 pub fn new(max_future_count: usize) -> Self {
724 Self {
725 future_count: 0,
726 max_future_count,
727 items: Default::default(),
728 }
729 }
730
731 pub fn add_barrier(&mut self, epoch: u64) {
732 if let Some((item_epoch, last_item)) = self.items.back() {
733 match last_item {
734 DeliveryFutureManagerItem::Chunk { .. } => {
735 assert_eq!(*item_epoch, epoch)
736 }
737 DeliveryFutureManagerItem::Barrier => {
738 assert!(
739 epoch > *item_epoch,
740 "new barrier epoch {} should be greater than prev barrier {}",
741 epoch,
742 item_epoch
743 );
744 }
745 }
746 }
747 self.items
748 .push_back((epoch, DeliveryFutureManagerItem::Barrier));
749 }
750
751 pub fn start_write_chunk(
752 &mut self,
753 epoch: u64,
754 chunk_id: ChunkId,
755 ) -> DeliveryFutureManagerAddFuture<'_, F> {
756 if let Some((item_epoch, item)) = self.items.back() {
757 match item {
758 DeliveryFutureManagerItem::Chunk {
759 chunk_id: item_chunk_id,
760 ..
761 } => {
762 assert_eq!(epoch, *item_epoch);
763 assert!(
764 chunk_id > *item_chunk_id,
765 "new chunk id {} should be greater than prev chunk id {}",
766 chunk_id,
767 item_chunk_id
768 );
769 }
770 DeliveryFutureManagerItem::Barrier => {
771 assert!(
772 epoch > *item_epoch,
773 "new chunk epoch {} should be greater than prev barrier: {}",
774 epoch,
775 item_epoch
776 );
777 }
778 }
779 }
780 self.items.push_back((
781 epoch,
782 DeliveryFutureManagerItem::Chunk {
783 chunk_id,
784 futures: VecDeque::new(),
785 },
786 ));
787 DeliveryFutureManagerAddFuture(self)
788 }
789}
790
791pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
792
793impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
794 pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
797 let mut has_await = false;
798 while self.0.future_count >= self.0.max_future_count {
799 self.await_one_delivery().await?;
800 has_await = true;
801 }
802 match self.0.items.back_mut() {
803 Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
804 futures.push_back(future);
805 self.0.future_count += 1;
806 Ok(has_await)
807 }
808 _ => unreachable!("should add future only after add a new chunk"),
809 }
810 }
811
812 pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
813 for (_, item) in &mut self.0.items {
814 if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
815 && let Some(mut delivery_future) = futures.pop_front()
816 {
817 self.0.future_count -= 1;
818 return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
819 } else {
820 continue;
821 }
822 }
823 Ok(())
824 }
825
826 pub fn future_count(&self) -> usize {
827 self.0.future_count
828 }
829
830 pub fn max_future_count(&self) -> usize {
831 self.0.max_future_count
832 }
833}
834
835impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
836 pub fn next_truncate_offset(
837 &mut self,
838 ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
839 poll_fn(move |cx| {
840 let mut latest_offset: Option<TruncateOffset> = None;
841 'outer: while let Some((epoch, item)) = self.items.front_mut() {
842 match item {
843 DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
844 while let Some(future) = futures.front_mut() {
845 match future.try_poll_unpin(cx) {
846 Poll::Ready(result) => match result {
847 Ok(()) => {
848 self.future_count -= 1;
849 futures.pop_front();
850 }
851 Err(e) => {
852 return Poll::Ready(Err(e));
853 }
854 },
855 Poll::Pending => {
856 break 'outer;
857 }
858 }
859 }
860
861 assert!(futures.is_empty());
864 latest_offset = Some(TruncateOffset::Chunk {
865 epoch: *epoch,
866 chunk_id: *chunk_id,
867 });
868 self.items.pop_front().expect("items not empty");
869 }
870 DeliveryFutureManagerItem::Barrier => {
871 latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
872 self.items.pop_front().expect("items not empty");
873 break 'outer;
875 }
876 }
877 }
878 if let Some(offset) = latest_offset {
879 Poll::Ready(Ok(offset))
880 } else {
881 Poll::Pending
882 }
883 })
884 }
885}
886
887#[cfg(test)]
888mod tests {
889 use std::future::{Future, poll_fn};
890 use std::pin::pin;
891 use std::task::Poll;
892
893 use futures::{FutureExt, TryFuture};
894 use risingwave_common::util::epoch::test_epoch;
895 use tokio::sync::oneshot;
896 use tokio::sync::oneshot::Receiver;
897
898 use super::LogStoreResult;
899 use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
900
901 #[test]
902 fn test_truncate_offset_cmp() {
903 assert!(
904 TruncateOffset::Barrier { epoch: 232 }
905 < TruncateOffset::Chunk {
906 epoch: 233,
907 chunk_id: 1
908 }
909 );
910 assert_eq!(
911 TruncateOffset::Chunk {
912 epoch: 1,
913 chunk_id: 1
914 },
915 TruncateOffset::Chunk {
916 epoch: 1,
917 chunk_id: 1
918 }
919 );
920 assert!(
921 TruncateOffset::Chunk {
922 epoch: 1,
923 chunk_id: 1
924 } < TruncateOffset::Chunk {
925 epoch: 1,
926 chunk_id: 2
927 }
928 );
929 assert!(
930 TruncateOffset::Barrier { epoch: 1 }
931 > TruncateOffset::Chunk {
932 epoch: 1,
933 chunk_id: 2
934 }
935 );
936 assert!(
937 TruncateOffset::Chunk {
938 epoch: 1,
939 chunk_id: 2
940 } < TruncateOffset::Barrier { epoch: 1 }
941 );
942 assert!(
943 TruncateOffset::Chunk {
944 epoch: 2,
945 chunk_id: 2
946 } > TruncateOffset::Barrier { epoch: 1 }
947 );
948 assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
949 }
950
951 type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
952
953 #[define_opaque(TestFuture)]
954 fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
955 async move { rx.await.unwrap() }.boxed()
956 }
957
958 #[tokio::test]
959 async fn test_empty() {
960 let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
961 let mut future = pin!(manager.next_truncate_offset());
962 assert!(
963 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
964 .await
965 .is_pending()
966 );
967 }
968
969 #[tokio::test]
970 async fn test_future_delivery_manager_basic() {
971 let mut manager = DeliveryFutureManager::new(2);
972 let epoch1 = 233;
973 let chunk_id1 = 1;
974 let (tx1_1, rx1_1) = oneshot::channel();
975 let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
976 assert!(
977 !write_chunk
978 .add_future_may_await(to_test_future(rx1_1))
979 .await
980 .unwrap()
981 );
982 assert_eq!(manager.future_count, 1);
983 {
984 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
985 assert!(
986 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
987 .await
988 .is_pending()
989 );
990 tx1_1.send(Ok(())).unwrap();
991 assert_eq!(
992 next_truncate_offset.await.unwrap(),
993 TruncateOffset::Chunk {
994 epoch: epoch1,
995 chunk_id: chunk_id1
996 }
997 );
998 }
999 assert_eq!(manager.future_count, 0);
1000 manager.add_barrier(epoch1);
1001 assert_eq!(
1002 manager.next_truncate_offset().await.unwrap(),
1003 TruncateOffset::Barrier { epoch: epoch1 }
1004 );
1005 }
1006
1007 #[tokio::test]
1008 async fn test_future_delivery_manager_compress_chunk() {
1009 let mut manager = DeliveryFutureManager::new(10);
1010 let epoch1 = test_epoch(233);
1011 let chunk_id1 = 1;
1012 let chunk_id2 = chunk_id1 + 1;
1013 let chunk_id3 = chunk_id2 + 1;
1014 let (tx1_1, rx1_1) = oneshot::channel();
1015 let (tx1_2, rx1_2) = oneshot::channel();
1016 let (tx1_3, rx1_3) = oneshot::channel();
1017 let epoch2 = test_epoch(234);
1018 let (tx2_1, rx2_1) = oneshot::channel();
1019 assert!(
1020 !manager
1021 .start_write_chunk(epoch1, chunk_id1)
1022 .add_future_may_await(to_test_future(rx1_1))
1023 .await
1024 .unwrap()
1025 );
1026 assert!(
1027 !manager
1028 .start_write_chunk(epoch1, chunk_id2)
1029 .add_future_may_await(to_test_future(rx1_2))
1030 .await
1031 .unwrap()
1032 );
1033 assert!(
1034 !manager
1035 .start_write_chunk(epoch1, chunk_id3)
1036 .add_future_may_await(to_test_future(rx1_3))
1037 .await
1038 .unwrap()
1039 );
1040 manager.add_barrier(epoch1);
1041 assert!(
1042 !manager
1043 .start_write_chunk(epoch2, chunk_id1)
1044 .add_future_may_await(to_test_future(rx2_1))
1045 .await
1046 .unwrap()
1047 );
1048 assert_eq!(manager.future_count, 4);
1049 {
1050 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1051 assert!(
1052 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1053 .await
1054 .is_pending()
1055 );
1056 tx1_2.send(Ok(())).unwrap();
1057 assert!(
1058 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1059 .await
1060 .is_pending()
1061 );
1062 tx1_1.send(Ok(())).unwrap();
1063 assert_eq!(
1065 next_truncate_offset.await.unwrap(),
1066 TruncateOffset::Chunk {
1067 epoch: epoch1,
1068 chunk_id: chunk_id2
1069 }
1070 );
1071 }
1072 assert_eq!(manager.future_count, 2);
1073 {
1074 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1075 assert!(
1076 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1077 .await
1078 .is_pending()
1079 );
1080 tx1_3.send(Ok(())).unwrap();
1081 tx2_1.send(Ok(())).unwrap();
1082 assert_eq!(
1084 next_truncate_offset.await.unwrap(),
1085 TruncateOffset::Barrier { epoch: epoch1 }
1086 );
1087 }
1088 assert_eq!(manager.future_count, 1);
1089 assert_eq!(
1090 manager.next_truncate_offset().await.unwrap(),
1091 TruncateOffset::Chunk {
1092 epoch: epoch2,
1093 chunk_id: chunk_id1
1094 }
1095 );
1096 }
1097
1098 #[tokio::test]
1099 async fn test_future_delivery_manager_await_future() {
1100 let mut manager = DeliveryFutureManager::new(2);
1101 let epoch = 233;
1102 let chunk_id1 = 1;
1103 let chunk_id2 = chunk_id1 + 1;
1104 let (tx1_1, rx1_1) = oneshot::channel();
1105 let (tx1_2, rx1_2) = oneshot::channel();
1106 let (tx2_1, rx2_1) = oneshot::channel();
1107 let (tx2_2, rx2_2) = oneshot::channel();
1108
1109 {
1110 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1111 assert!(
1112 !write_chunk
1113 .add_future_may_await(to_test_future(rx1_1))
1114 .await
1115 .unwrap()
1116 );
1117 assert!(
1118 !write_chunk
1119 .add_future_may_await(to_test_future(rx1_2))
1120 .await
1121 .unwrap()
1122 );
1123 assert_eq!(manager.future_count, 2);
1124 }
1125
1126 {
1127 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1128 {
1129 let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1130 assert!(
1131 poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1132 .await
1133 .is_pending()
1134 );
1135 tx1_1.send(Ok(())).unwrap();
1136 assert!(future1.await.unwrap());
1137 }
1138 assert_eq!(2, write_chunk.future_count());
1139 {
1140 let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1141 assert!(
1142 poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1143 .await
1144 .is_pending()
1145 );
1146 tx1_2.send(Ok(())).unwrap();
1147 assert!(future2.await.unwrap());
1148 }
1149 assert_eq!(2, write_chunk.future_count());
1150 {
1151 let mut future3 = pin!(write_chunk.await_one_delivery());
1152 assert!(
1153 poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1154 .await
1155 .is_pending()
1156 );
1157 tx2_1.send(Ok(())).unwrap();
1158 future3.await.unwrap();
1159 }
1160 assert_eq!(1, write_chunk.future_count());
1161 }
1162
1163 assert_eq!(
1164 manager.next_truncate_offset().await.unwrap(),
1165 TruncateOffset::Chunk {
1166 epoch,
1167 chunk_id: chunk_id1
1168 }
1169 );
1170
1171 assert_eq!(1, manager.future_count);
1172
1173 {
1174 let mut future = pin!(manager.next_truncate_offset());
1175 assert!(
1176 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1177 .await
1178 .is_pending()
1179 );
1180 tx2_2.send(Ok(())).unwrap();
1181 assert_eq!(
1182 future.await.unwrap(),
1183 TruncateOffset::Chunk {
1184 epoch,
1185 chunk_id: chunk_id2
1186 }
1187 );
1188 }
1189
1190 assert_eq!(0, manager.future_count);
1191 }
1192}