risingwave_connector/sink/
log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use either::Either;
26use futures::future::BoxFuture;
27use futures::{TryFuture, TryFutureExt};
28use risingwave_common::array::StreamChunk;
29use risingwave_common::bail;
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::Field;
32use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
33use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
34use risingwave_common_estimate_size::EstimateSize;
35use risingwave_common_rate_limit::{RateLimit, RateLimiter};
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39pub type LogStoreResult<T> = Result<T, anyhow::Error>;
40pub type ChunkId = usize;
41
42#[derive(Debug, PartialEq, Copy, Clone)]
43pub enum TruncateOffset {
44    Chunk { epoch: u64, chunk_id: ChunkId },
45    Barrier { epoch: u64 },
46}
47
48impl PartialOrd for TruncateOffset {
49    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
50        let extract = |offset: &TruncateOffset| match offset {
51            TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
52            TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
53        };
54        let this = extract(self);
55        let other = extract(other);
56        this.partial_cmp(&other)
57    }
58}
59
60impl TruncateOffset {
61    pub fn next_chunk_id(&self) -> ChunkId {
62        match self {
63            TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
64            TruncateOffset::Barrier { .. } => 0,
65        }
66    }
67
68    pub fn epoch(&self) -> u64 {
69        match self {
70            TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
71        }
72    }
73
74    pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
75        if *self >= next_offset {
76            bail!(
77                "next offset {:?} should be later than current offset {:?}",
78                next_offset,
79                self
80            )
81        } else {
82            Ok(())
83        }
84    }
85
86    pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
87        match self {
88            TruncateOffset::Chunk {
89                epoch: offset_epoch,
90                ..
91            } => {
92                if epoch != *offset_epoch {
93                    bail!(
94                        "new item epoch {} does not match current chunk offset epoch {}",
95                        epoch,
96                        offset_epoch
97                    );
98                }
99            }
100            TruncateOffset::Barrier {
101                epoch: offset_epoch,
102            } => {
103                if epoch <= *offset_epoch {
104                    bail!(
105                        "new item epoch {} does not exceed barrier offset epoch {}",
106                        epoch,
107                        offset_epoch
108                    );
109                }
110            }
111        }
112        Ok(())
113    }
114}
115
116#[derive(Debug)]
117pub enum LogStoreReadItem {
118    StreamChunk {
119        chunk: StreamChunk,
120        chunk_id: ChunkId,
121    },
122    Barrier {
123        is_checkpoint: bool,
124        new_vnode_bitmap: Option<Arc<Bitmap>>,
125        is_stop: bool,
126        add_columns: Option<Vec<Field>>,
127    },
128}
129
130pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
131
132#[must_use]
133pub struct LogWriterPostFlushCurrentEpoch<'a>(
134    Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
135);
136
137impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
138    pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
139        Self(Box::new(f))
140    }
141
142    pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
143        self.0().await
144    }
145}
146
147pub struct FlushCurrentEpochOptions {
148    pub is_checkpoint: bool,
149    pub new_vnode_bitmap: Option<Arc<Bitmap>>,
150    pub is_stop: bool,
151    pub add_columns: Option<Vec<Field>>,
152}
153
154pub trait LogWriter: Send {
155    /// Initialize the log writer with an epoch
156    fn init(
157        &mut self,
158        epoch: EpochPair,
159        pause_read_on_bootstrap: bool,
160    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
161
162    /// Write a stream chunk to the log writer
163    fn write_chunk(
164        &mut self,
165        chunk: StreamChunk,
166    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
167
168    /// Mark current epoch as finished and sealed, and flush the unconsumed log data.
169    fn flush_current_epoch(
170        &mut self,
171        next_epoch: u64,
172        options: FlushCurrentEpochOptions,
173    ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
174
175    fn pause(&mut self) -> LogStoreResult<()>;
176
177    fn resume(&mut self) -> LogStoreResult<()>;
178}
179
180pub trait LogReader: Send + Sized + 'static {
181    /// Initialize the log reader. Usually function as waiting for log writer to be initialized.
182    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
183
184    /// Consume log store from given `start_offset` or aligned start offset recorded previously.
185    fn start_from(
186        &mut self,
187        start_offset: Option<u64>,
188    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
189
190    /// Emit the next item.
191    ///
192    /// The implementation should ensure that the future is cancellation safe.
193    fn next_item(
194        &mut self,
195    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
196
197    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
198    /// from the current offset.
199    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
200
201    /// Reset the log reader to after the latest truncate offset
202    ///
203    /// The return flag means whether the log store support rewind
204    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
205}
206
207pub trait LogStoreFactory: Send + 'static {
208    const ALLOW_REWIND: bool;
209    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
210    type Reader: LogReader;
211    type Writer: LogWriter;
212
213    fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
214}
215
216pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
217    f: F,
218    inner: R,
219}
220
221impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
222    for TransformChunkLogReader<F, R>
223{
224    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
225        self.inner.init()
226    }
227
228    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
229        let (epoch, item) = self.inner.next_item().await?;
230        let item = match item {
231            LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
232                chunk: (self.f)(chunk),
233                chunk_id,
234            },
235            other => other,
236        };
237        Ok((epoch, item))
238    }
239
240    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
241        self.inner.truncate(offset)
242    }
243
244    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
245        self.inner.rewind()
246    }
247
248    fn start_from(
249        &mut self,
250        start_offset: Option<u64>,
251    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
252        self.inner.start_from(start_offset)
253    }
254}
255
256pub struct BackpressureMonitoredLogReader<R: LogReader> {
257    inner: R,
258    /// Start time to wait for new future after poll ready
259    wait_new_future_start_time: Option<Instant>,
260    wait_new_future_duration_ns: LabelGuardedIntCounter,
261}
262
263impl<R: LogReader> BackpressureMonitoredLogReader<R> {
264    fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
265        Self {
266            inner,
267            wait_new_future_start_time: None,
268            wait_new_future_duration_ns,
269        }
270    }
271}
272
273impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
274    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
275        self.wait_new_future_start_time = None;
276        self.inner.init()
277    }
278
279    fn next_item(
280        &mut self,
281    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
282        if let Some(start_time) = self.wait_new_future_start_time.take() {
283            self.wait_new_future_duration_ns
284                .inc_by(start_time.elapsed().as_nanos() as _);
285        }
286        self.inner.next_item().inspect_ok(|_| {
287            // Set start time when return ready
288            self.wait_new_future_start_time = Some(Instant::now());
289        })
290    }
291
292    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
293        self.inner.truncate(offset)
294    }
295
296    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
297        self.inner.rewind().inspect_ok(|_| {
298            self.wait_new_future_start_time = None;
299        })
300    }
301
302    fn start_from(
303        &mut self,
304        start_offset: Option<u64>,
305    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
306        self.inner.start_from(start_offset)
307    }
308}
309
310pub struct MonitoredLogReader<R: LogReader> {
311    inner: R,
312    read_epoch: u64,
313    metrics: LogReaderMetrics,
314}
315
316pub struct LogReaderMetrics {
317    pub log_store_latest_read_epoch: LabelGuardedIntGauge,
318    pub log_store_read_rows: LabelGuardedIntCounter,
319    pub log_store_read_bytes: LabelGuardedIntCounter,
320    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
321}
322
323impl<R: LogReader> MonitoredLogReader<R> {
324    pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
325        Self {
326            inner,
327            read_epoch: INVALID_EPOCH,
328            metrics,
329        }
330    }
331}
332
333impl<R: LogReader> LogReader for MonitoredLogReader<R> {
334    async fn init(&mut self) -> LogStoreResult<()> {
335        self.inner.init().instrument_await("log_reader_init").await
336    }
337
338    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
339        self.inner
340            .next_item()
341            .instrument_await("log_reader_next_item")
342            .await
343            .inspect(|(epoch, item)| {
344                if self.read_epoch != *epoch {
345                    self.read_epoch = *epoch;
346                    self.metrics.log_store_latest_read_epoch.set(*epoch as _);
347                }
348                if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
349                    self.metrics
350                        .log_store_read_rows
351                        .inc_by(chunk.cardinality() as _);
352                    self.metrics
353                        .log_store_read_bytes
354                        .inc_by(chunk.estimated_size() as u64);
355                }
356            })
357    }
358
359    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
360        self.inner.truncate(offset)
361    }
362
363    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
364        self.inner.rewind().instrument_await("log_reader_rewind")
365    }
366
367    fn start_from(
368        &mut self,
369        start_offset: Option<u64>,
370    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
371        self.inner.start_from(start_offset)
372    }
373}
374
375type UpstreamChunkOffset = TruncateOffset;
376type DownstreamChunkOffset = TruncateOffset;
377
378struct SplitChunk {
379    chunk: StreamChunk,
380    upstream_chunk_offset: UpstreamChunkOffset,
381    // Indicate whether this is the last split chunk from the same `upstream_chunk_offset`
382    is_last: bool,
383}
384
385struct RateLimitedLogReaderCore<R: LogReader> {
386    inner: R,
387    // Newer items at the front
388    consumed_offset_queue: VecDeque<(DownstreamChunkOffset, UpstreamChunkOffset)>,
389    // Newer items at the front
390    unconsumed_chunk_queue: VecDeque<SplitChunk>,
391    next_chunk_id: usize,
392}
393
394impl<R: LogReader> RateLimitedLogReaderCore<R> {
395    // Returns: Left - chunk, Right - Barrier/UpdateVnodeBitmap
396    async fn next_item(&mut self) -> LogStoreResult<Either<SplitChunk, (u64, LogStoreReadItem)>> {
397        // Get upstream chunk from unconsumed_chunk_queue first.
398        // If unconsumed_chunk_queue is empty, get the chunk from the inner log reader.
399        match self.unconsumed_chunk_queue.pop_back() {
400            Some(split_chunk) => Ok(Either::Left(split_chunk)),
401            None => {
402                let (epoch, item) = self.inner.next_item().await?;
403                match item {
404                    LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
405                        Ok(Either::Left(SplitChunk {
406                            chunk,
407                            upstream_chunk_offset: UpstreamChunkOffset::Chunk { epoch, chunk_id },
408                            is_last: true,
409                        }))
410                    }
411                    LogStoreReadItem::Barrier { .. } => {
412                        self.consumed_offset_queue.push_front((
413                            TruncateOffset::Barrier { epoch },
414                            TruncateOffset::Barrier { epoch },
415                        ));
416                        self.next_chunk_id = 0;
417                        Ok(Either::Right((epoch, item)))
418                    }
419                }
420            }
421        }
422    }
423}
424
425pub struct RateLimitedLogReader<R: LogReader> {
426    core: RateLimitedLogReaderCore<R>,
427    rate_limiter: RateLimiter,
428    control_rx: UnboundedReceiver<RateLimit>,
429}
430
431impl<R: LogReader> RateLimitedLogReader<R> {
432    pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
433        Self {
434            core: RateLimitedLogReaderCore {
435                inner,
436                consumed_offset_queue: VecDeque::new(),
437                unconsumed_chunk_queue: VecDeque::new(),
438                next_chunk_id: 0,
439            },
440            rate_limiter: RateLimiter::new(RateLimit::Disabled),
441            control_rx,
442        }
443    }
444}
445
446impl<R: LogReader> RateLimitedLogReader<R> {
447    async fn apply_rate_limit(
448        &mut self,
449        split_chunk: SplitChunk,
450    ) -> LogStoreResult<(u64, LogStoreReadItem)> {
451        let split_chunk = match self.rate_limiter.rate_limit() {
452            RateLimit::Pause => unreachable!(
453                "apply_rate_limit is not supposed to be called while the stream is paused"
454            ),
455            RateLimit::Disabled => split_chunk,
456            RateLimit::Fixed(limit) => {
457                let limit = limit.get();
458                let required_permits = split_chunk.chunk.compute_rate_limit_chunk_permits();
459                if required_permits <= limit {
460                    self.rate_limiter.wait(required_permits).await;
461                    split_chunk
462                } else {
463                    // Cut the chunk into smaller chunks
464                    let mut chunks = split_chunk.chunk.split(limit as _).into_iter();
465                    let mut is_last = split_chunk.is_last;
466                    let upstream_chunk_offset = split_chunk.upstream_chunk_offset;
467
468                    // The first chunk after splitting will be returned
469                    let first_chunk = chunks.next().unwrap();
470
471                    // The remaining chunks will be pushed to the queue
472                    for chunk in chunks.rev() {
473                        // The last chunk after splitting inherits the `is_last` from the original chunk
474                        self.core.unconsumed_chunk_queue.push_back(SplitChunk {
475                            chunk,
476                            upstream_chunk_offset,
477                            is_last,
478                        });
479                        is_last = false;
480                    }
481
482                    // Trigger rate limit and return the first chunk
483                    self.rate_limiter
484                        .wait(first_chunk.compute_rate_limit_chunk_permits())
485                        .await;
486                    SplitChunk {
487                        chunk: first_chunk,
488                        upstream_chunk_offset,
489                        is_last,
490                    }
491                }
492            }
493        };
494
495        // Update the consumed_offset_queue if the `split_chunk` is the last chunk of the upstream chunk
496        let epoch = split_chunk.upstream_chunk_offset.epoch();
497        let downstream_chunk_id = self.core.next_chunk_id;
498        self.core.next_chunk_id += 1;
499        if split_chunk.is_last {
500            self.core.consumed_offset_queue.push_front((
501                TruncateOffset::Chunk {
502                    epoch,
503                    chunk_id: downstream_chunk_id,
504                },
505                split_chunk.upstream_chunk_offset,
506            ));
507        }
508
509        Ok((
510            epoch,
511            LogStoreReadItem::StreamChunk {
512                chunk: split_chunk.chunk,
513                chunk_id: downstream_chunk_id,
514            },
515        ))
516    }
517}
518
519impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
520    async fn init(&mut self) -> LogStoreResult<()> {
521        self.core.inner.init().await
522    }
523
524    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
525        let mut paused = false;
526        loop {
527            select! {
528                biased;
529                recv = pin!(self.control_rx.recv()) => {
530                    let new_rate_limit = match recv {
531                        Some(limit) => limit,
532                        None => bail!("rate limit control channel closed"),
533                    };
534                    let old_rate_limit = self.rate_limiter.update(new_rate_limit);
535                    paused = matches!(new_rate_limit, RateLimit::Pause);
536                    tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
537                },
538                item = self.core.next_item(), if !paused => {
539                    let item = item?;
540                    match item {
541                        Either::Left(split_chunk) => {
542                            return self.apply_rate_limit(split_chunk).await;
543                        },
544                        Either::Right(item) => {
545                            assert!(matches!(item.1, LogStoreReadItem::Barrier{..}));
546                            return Ok(item);
547                        },
548                    }
549                }
550            }
551        }
552    }
553
554    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
555        let mut truncate_offset = None;
556        while let Some((downstream_offset, upstream_offset)) =
557            self.core.consumed_offset_queue.back()
558        {
559            if *downstream_offset <= offset {
560                truncate_offset = Some(*upstream_offset);
561                self.core.consumed_offset_queue.pop_back();
562            } else {
563                break;
564            }
565        }
566        tracing::trace!(
567            "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
568            truncate_offset,
569            offset
570        );
571        if let Some(offset) = truncate_offset {
572            self.core.inner.truncate(offset)
573        } else {
574            Ok(())
575        }
576    }
577
578    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
579        self.core.unconsumed_chunk_queue.clear();
580        self.core.consumed_offset_queue.clear();
581        self.core.next_chunk_id = 0;
582        self.core.inner.rewind()
583    }
584
585    fn start_from(
586        &mut self,
587        start_offset: Option<u64>,
588    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
589        self.core.inner.start_from(start_offset)
590    }
591}
592
593#[easy_ext::ext(LogReaderExt)]
594impl<T> T
595where
596    T: LogReader,
597{
598    pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
599        self,
600        f: F,
601    ) -> TransformChunkLogReader<F, Self> {
602        TransformChunkLogReader { f, inner: self }
603    }
604
605    pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
606        // TODO: The `clone()` can be avoided if move backpressure inside `MonitoredLogReader`
607        let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
608        BackpressureMonitoredLogReader::new(
609            MonitoredLogReader::new(self, metrics),
610            wait_new_future_duration,
611        )
612    }
613
614    pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
615        RateLimitedLogReader::new(self, control_rx)
616    }
617}
618
619pub struct MonitoredLogWriter<W: LogWriter> {
620    inner: W,
621    metrics: LogWriterMetrics,
622}
623
624pub struct LogWriterMetrics {
625    // Labels: [actor_id, sink_id, sink_name]
626    pub log_store_first_write_epoch: LabelGuardedIntGauge,
627    pub log_store_latest_write_epoch: LabelGuardedIntGauge,
628    pub log_store_write_rows: LabelGuardedIntCounter,
629}
630
631impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
632    async fn init(
633        &mut self,
634        epoch: EpochPair,
635        pause_read_on_bootstrap: bool,
636    ) -> LogStoreResult<()> {
637        self.metrics
638            .log_store_first_write_epoch
639            .set(epoch.curr as _);
640        self.metrics
641            .log_store_latest_write_epoch
642            .set(epoch.curr as _);
643        self.inner.init(epoch, pause_read_on_bootstrap).await
644    }
645
646    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
647        self.metrics
648            .log_store_write_rows
649            .inc_by(chunk.cardinality() as _);
650        self.inner.write_chunk(chunk).await
651    }
652
653    async fn flush_current_epoch(
654        &mut self,
655        next_epoch: u64,
656        options: FlushCurrentEpochOptions,
657    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
658        let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
659        self.metrics
660            .log_store_latest_write_epoch
661            .set(next_epoch as _);
662        Ok(post_flush)
663    }
664
665    fn pause(&mut self) -> LogStoreResult<()> {
666        self.inner.pause()
667    }
668
669    fn resume(&mut self) -> LogStoreResult<()> {
670        self.inner.resume()
671    }
672}
673
674#[easy_ext::ext(LogWriterExt)]
675impl<T> T
676where
677    T: LogWriter + Sized,
678{
679    pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
680        MonitoredLogWriter {
681            inner: self,
682            metrics,
683        }
684    }
685}
686
687enum DeliveryFutureManagerItem<F> {
688    Chunk {
689        chunk_id: ChunkId,
690        // earlier future at the front
691        futures: VecDeque<F>,
692    },
693    Barrier,
694}
695
696pub struct DeliveryFutureManager<F> {
697    future_count: usize,
698    max_future_count: usize,
699    // earlier items at the front
700    items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
701}
702
703impl<F> DeliveryFutureManager<F> {
704    pub fn new(max_future_count: usize) -> Self {
705        Self {
706            future_count: 0,
707            max_future_count,
708            items: Default::default(),
709        }
710    }
711
712    pub fn add_barrier(&mut self, epoch: u64) {
713        if let Some((item_epoch, last_item)) = self.items.back() {
714            match last_item {
715                DeliveryFutureManagerItem::Chunk { .. } => {
716                    assert_eq!(*item_epoch, epoch)
717                }
718                DeliveryFutureManagerItem::Barrier => {
719                    assert!(
720                        epoch > *item_epoch,
721                        "new barrier epoch {} should be greater than prev barrier {}",
722                        epoch,
723                        item_epoch
724                    );
725                }
726            }
727        }
728        self.items
729            .push_back((epoch, DeliveryFutureManagerItem::Barrier));
730    }
731
732    pub fn start_write_chunk(
733        &mut self,
734        epoch: u64,
735        chunk_id: ChunkId,
736    ) -> DeliveryFutureManagerAddFuture<'_, F> {
737        if let Some((item_epoch, item)) = self.items.back() {
738            match item {
739                DeliveryFutureManagerItem::Chunk {
740                    chunk_id: item_chunk_id,
741                    ..
742                } => {
743                    assert_eq!(epoch, *item_epoch);
744                    assert!(
745                        chunk_id > *item_chunk_id,
746                        "new chunk id {} should be greater than prev chunk id {}",
747                        chunk_id,
748                        item_chunk_id
749                    );
750                }
751                DeliveryFutureManagerItem::Barrier => {
752                    assert!(
753                        epoch > *item_epoch,
754                        "new chunk epoch {} should be greater than prev barrier: {}",
755                        epoch,
756                        item_epoch
757                    );
758                }
759            }
760        }
761        self.items.push_back((
762            epoch,
763            DeliveryFutureManagerItem::Chunk {
764                chunk_id,
765                futures: VecDeque::new(),
766            },
767        ));
768        DeliveryFutureManagerAddFuture(self)
769    }
770}
771
772pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
773
774impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
775    /// Add a new future to the latest started written chunk.
776    /// The returned bool value indicate whether we have awaited on any previous futures.
777    pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
778        let mut has_await = false;
779        while self.0.future_count >= self.0.max_future_count {
780            self.await_one_delivery().await?;
781            has_await = true;
782        }
783        match self.0.items.back_mut() {
784            Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
785                futures.push_back(future);
786                self.0.future_count += 1;
787                Ok(has_await)
788            }
789            _ => unreachable!("should add future only after add a new chunk"),
790        }
791    }
792
793    pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
794        for (_, item) in &mut self.0.items {
795            if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
796                && let Some(mut delivery_future) = futures.pop_front()
797            {
798                self.0.future_count -= 1;
799                return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
800            } else {
801                continue;
802            }
803        }
804        Ok(())
805    }
806
807    pub fn future_count(&self) -> usize {
808        self.0.future_count
809    }
810
811    pub fn max_future_count(&self) -> usize {
812        self.0.max_future_count
813    }
814}
815
816impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
817    pub fn next_truncate_offset(
818        &mut self,
819    ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
820        poll_fn(move |cx| {
821            let mut latest_offset: Option<TruncateOffset> = None;
822            'outer: while let Some((epoch, item)) = self.items.front_mut() {
823                match item {
824                    DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
825                        while let Some(future) = futures.front_mut() {
826                            match future.try_poll_unpin(cx) {
827                                Poll::Ready(result) => match result {
828                                    Ok(()) => {
829                                        self.future_count -= 1;
830                                        futures.pop_front();
831                                    }
832                                    Err(e) => {
833                                        return Poll::Ready(Err(e));
834                                    }
835                                },
836                                Poll::Pending => {
837                                    break 'outer;
838                                }
839                            }
840                        }
841
842                        // when we reach here, there must not be any pending or error future.
843                        // Which means all futures of this stream chunk have been finished
844                        assert!(futures.is_empty());
845                        latest_offset = Some(TruncateOffset::Chunk {
846                            epoch: *epoch,
847                            chunk_id: *chunk_id,
848                        });
849                        self.items.pop_front().expect("items not empty");
850                    }
851                    DeliveryFutureManagerItem::Barrier => {
852                        latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
853                        self.items.pop_front().expect("items not empty");
854                        // Barrier will be yielded anyway
855                        break 'outer;
856                    }
857                }
858            }
859            if let Some(offset) = latest_offset {
860                Poll::Ready(Ok(offset))
861            } else {
862                Poll::Pending
863            }
864        })
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use std::future::{Future, poll_fn};
871    use std::pin::pin;
872    use std::task::Poll;
873
874    use futures::{FutureExt, TryFuture};
875    use risingwave_common::util::epoch::test_epoch;
876    use tokio::sync::oneshot;
877    use tokio::sync::oneshot::Receiver;
878
879    use super::LogStoreResult;
880    use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
881
882    #[test]
883    fn test_truncate_offset_cmp() {
884        assert!(
885            TruncateOffset::Barrier { epoch: 232 }
886                < TruncateOffset::Chunk {
887                    epoch: 233,
888                    chunk_id: 1
889                }
890        );
891        assert_eq!(
892            TruncateOffset::Chunk {
893                epoch: 1,
894                chunk_id: 1
895            },
896            TruncateOffset::Chunk {
897                epoch: 1,
898                chunk_id: 1
899            }
900        );
901        assert!(
902            TruncateOffset::Chunk {
903                epoch: 1,
904                chunk_id: 1
905            } < TruncateOffset::Chunk {
906                epoch: 1,
907                chunk_id: 2
908            }
909        );
910        assert!(
911            TruncateOffset::Barrier { epoch: 1 }
912                > TruncateOffset::Chunk {
913                    epoch: 1,
914                    chunk_id: 2
915                }
916        );
917        assert!(
918            TruncateOffset::Chunk {
919                epoch: 1,
920                chunk_id: 2
921            } < TruncateOffset::Barrier { epoch: 1 }
922        );
923        assert!(
924            TruncateOffset::Chunk {
925                epoch: 2,
926                chunk_id: 2
927            } > TruncateOffset::Barrier { epoch: 1 }
928        );
929        assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
930    }
931
932    type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
933
934    #[define_opaque(TestFuture)]
935    fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
936        async move { rx.await.unwrap() }.boxed()
937    }
938
939    #[tokio::test]
940    async fn test_empty() {
941        let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
942        let mut future = pin!(manager.next_truncate_offset());
943        assert!(
944            poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
945                .await
946                .is_pending()
947        );
948    }
949
950    #[tokio::test]
951    async fn test_future_delivery_manager_basic() {
952        let mut manager = DeliveryFutureManager::new(2);
953        let epoch1 = 233;
954        let chunk_id1 = 1;
955        let (tx1_1, rx1_1) = oneshot::channel();
956        let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
957        assert!(
958            !write_chunk
959                .add_future_may_await(to_test_future(rx1_1))
960                .await
961                .unwrap()
962        );
963        assert_eq!(manager.future_count, 1);
964        {
965            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
966            assert!(
967                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
968                    .await
969                    .is_pending()
970            );
971            tx1_1.send(Ok(())).unwrap();
972            assert_eq!(
973                next_truncate_offset.await.unwrap(),
974                TruncateOffset::Chunk {
975                    epoch: epoch1,
976                    chunk_id: chunk_id1
977                }
978            );
979        }
980        assert_eq!(manager.future_count, 0);
981        manager.add_barrier(epoch1);
982        assert_eq!(
983            manager.next_truncate_offset().await.unwrap(),
984            TruncateOffset::Barrier { epoch: epoch1 }
985        );
986    }
987
988    #[tokio::test]
989    async fn test_future_delivery_manager_compress_chunk() {
990        let mut manager = DeliveryFutureManager::new(10);
991        let epoch1 = test_epoch(233);
992        let chunk_id1 = 1;
993        let chunk_id2 = chunk_id1 + 1;
994        let chunk_id3 = chunk_id2 + 1;
995        let (tx1_1, rx1_1) = oneshot::channel();
996        let (tx1_2, rx1_2) = oneshot::channel();
997        let (tx1_3, rx1_3) = oneshot::channel();
998        let epoch2 = test_epoch(234);
999        let (tx2_1, rx2_1) = oneshot::channel();
1000        assert!(
1001            !manager
1002                .start_write_chunk(epoch1, chunk_id1)
1003                .add_future_may_await(to_test_future(rx1_1))
1004                .await
1005                .unwrap()
1006        );
1007        assert!(
1008            !manager
1009                .start_write_chunk(epoch1, chunk_id2)
1010                .add_future_may_await(to_test_future(rx1_2))
1011                .await
1012                .unwrap()
1013        );
1014        assert!(
1015            !manager
1016                .start_write_chunk(epoch1, chunk_id3)
1017                .add_future_may_await(to_test_future(rx1_3))
1018                .await
1019                .unwrap()
1020        );
1021        manager.add_barrier(epoch1);
1022        assert!(
1023            !manager
1024                .start_write_chunk(epoch2, chunk_id1)
1025                .add_future_may_await(to_test_future(rx2_1))
1026                .await
1027                .unwrap()
1028        );
1029        assert_eq!(manager.future_count, 4);
1030        {
1031            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1032            assert!(
1033                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1034                    .await
1035                    .is_pending()
1036            );
1037            tx1_2.send(Ok(())).unwrap();
1038            assert!(
1039                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1040                    .await
1041                    .is_pending()
1042            );
1043            tx1_1.send(Ok(())).unwrap();
1044            // The offset of chunk1 and chunk2 are compressed
1045            assert_eq!(
1046                next_truncate_offset.await.unwrap(),
1047                TruncateOffset::Chunk {
1048                    epoch: epoch1,
1049                    chunk_id: chunk_id2
1050                }
1051            );
1052        }
1053        assert_eq!(manager.future_count, 2);
1054        {
1055            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1056            assert!(
1057                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1058                    .await
1059                    .is_pending()
1060            );
1061            tx1_3.send(Ok(())).unwrap();
1062            tx2_1.send(Ok(())).unwrap();
1063            // Emit barrier though later chunk has finished.
1064            assert_eq!(
1065                next_truncate_offset.await.unwrap(),
1066                TruncateOffset::Barrier { epoch: epoch1 }
1067            );
1068        }
1069        assert_eq!(manager.future_count, 1);
1070        assert_eq!(
1071            manager.next_truncate_offset().await.unwrap(),
1072            TruncateOffset::Chunk {
1073                epoch: epoch2,
1074                chunk_id: chunk_id1
1075            }
1076        );
1077    }
1078
1079    #[tokio::test]
1080    async fn test_future_delivery_manager_await_future() {
1081        let mut manager = DeliveryFutureManager::new(2);
1082        let epoch = 233;
1083        let chunk_id1 = 1;
1084        let chunk_id2 = chunk_id1 + 1;
1085        let (tx1_1, rx1_1) = oneshot::channel();
1086        let (tx1_2, rx1_2) = oneshot::channel();
1087        let (tx2_1, rx2_1) = oneshot::channel();
1088        let (tx2_2, rx2_2) = oneshot::channel();
1089
1090        {
1091            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1092            assert!(
1093                !write_chunk
1094                    .add_future_may_await(to_test_future(rx1_1))
1095                    .await
1096                    .unwrap()
1097            );
1098            assert!(
1099                !write_chunk
1100                    .add_future_may_await(to_test_future(rx1_2))
1101                    .await
1102                    .unwrap()
1103            );
1104            assert_eq!(manager.future_count, 2);
1105        }
1106
1107        {
1108            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1109            {
1110                let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1111                assert!(
1112                    poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1113                        .await
1114                        .is_pending()
1115                );
1116                tx1_1.send(Ok(())).unwrap();
1117                assert!(future1.await.unwrap());
1118            }
1119            assert_eq!(2, write_chunk.future_count());
1120            {
1121                let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1122                assert!(
1123                    poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1124                        .await
1125                        .is_pending()
1126                );
1127                tx1_2.send(Ok(())).unwrap();
1128                assert!(future2.await.unwrap());
1129            }
1130            assert_eq!(2, write_chunk.future_count());
1131            {
1132                let mut future3 = pin!(write_chunk.await_one_delivery());
1133                assert!(
1134                    poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1135                        .await
1136                        .is_pending()
1137                );
1138                tx2_1.send(Ok(())).unwrap();
1139                future3.await.unwrap();
1140            }
1141            assert_eq!(1, write_chunk.future_count());
1142        }
1143
1144        assert_eq!(
1145            manager.next_truncate_offset().await.unwrap(),
1146            TruncateOffset::Chunk {
1147                epoch,
1148                chunk_id: chunk_id1
1149            }
1150        );
1151
1152        assert_eq!(1, manager.future_count);
1153
1154        {
1155            let mut future = pin!(manager.next_truncate_offset());
1156            assert!(
1157                poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1158                    .await
1159                    .is_pending()
1160            );
1161            tx2_2.send(Ok(())).unwrap();
1162            assert_eq!(
1163                future.await.unwrap(),
1164                TruncateOffset::Chunk {
1165                    epoch,
1166                    chunk_id: chunk_id2
1167                }
1168            );
1169        }
1170
1171        assert_eq!(0, manager.future_count);
1172    }
1173}