risingwave_connector/sink/
log_store.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, pending, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::{TryFuture, TryFutureExt};
27use risingwave_common::array::StreamChunk;
28use risingwave_common::bail;
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
31use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
32use risingwave_common_estimate_size::EstimateSize;
33use risingwave_common_rate_limit::{RateLimit, RateLimiter};
34use risingwave_pb::stream_plan::PbSinkSchemaChange;
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Eq, Copy, Clone)]
42pub enum TruncateOffset {
43    Chunk { epoch: u64, chunk_id: ChunkId },
44    Barrier { epoch: u64 },
45}
46
47impl Ord for TruncateOffset {
48    fn cmp(&self, other: &Self) -> Ordering {
49        let extract = |offset: &TruncateOffset| match offset {
50            TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51            TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52        };
53        extract(self).cmp(&extract(other))
54    }
55}
56
57impl PartialOrd for TruncateOffset {
58    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
59        Some(self.cmp(other))
60    }
61}
62
63impl TruncateOffset {
64    pub fn next_chunk_id(&self) -> ChunkId {
65        match self {
66            TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
67            TruncateOffset::Barrier { .. } => 0,
68        }
69    }
70
71    pub fn epoch(&self) -> u64 {
72        match self {
73            TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
74        }
75    }
76
77    pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
78        if *self >= next_offset {
79            bail!(
80                "next offset {:?} should be later than current offset {:?}",
81                next_offset,
82                self
83            )
84        } else {
85            Ok(())
86        }
87    }
88
89    pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
90        match self {
91            TruncateOffset::Chunk {
92                epoch: offset_epoch,
93                ..
94            } => {
95                if epoch != *offset_epoch {
96                    bail!(
97                        "new item epoch {} does not match current chunk offset epoch {}",
98                        epoch,
99                        offset_epoch
100                    );
101                }
102            }
103            TruncateOffset::Barrier {
104                epoch: offset_epoch,
105            } => {
106                if epoch <= *offset_epoch {
107                    bail!(
108                        "new item epoch {} does not exceed barrier offset epoch {}",
109                        epoch,
110                        offset_epoch
111                    );
112                }
113            }
114        }
115        Ok(())
116    }
117}
118
119#[derive(Debug)]
120pub enum LogStoreReadItem {
121    StreamChunk {
122        chunk: StreamChunk,
123        chunk_id: ChunkId,
124    },
125    Barrier {
126        is_checkpoint: bool,
127        new_vnode_bitmap: Option<Arc<Bitmap>>,
128        is_stop: bool,
129        schema_change: Option<PbSinkSchemaChange>,
130    },
131}
132
133pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
134
135#[must_use]
136pub struct LogWriterPostFlushCurrentEpoch<'a>(
137    Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
138);
139
140impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
141    pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
142        Self(Box::new(f))
143    }
144
145    pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
146        self.0().await
147    }
148}
149
150pub struct FlushCurrentEpochOptions {
151    pub is_checkpoint: bool,
152    pub new_vnode_bitmap: Option<Arc<Bitmap>>,
153    pub is_stop: bool,
154    pub schema_change: Option<PbSinkSchemaChange>,
155}
156
157pub trait LogWriter: Send {
158    /// Initialize the log writer with an epoch
159    fn init(
160        &mut self,
161        epoch: EpochPair,
162        pause_read_on_bootstrap: bool,
163    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
164
165    /// Write a stream chunk to the log writer
166    fn write_chunk(
167        &mut self,
168        chunk: StreamChunk,
169    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
170
171    /// Mark current epoch as finished and sealed, and flush the unconsumed log data.
172    fn flush_current_epoch(
173        &mut self,
174        next_epoch: u64,
175        options: FlushCurrentEpochOptions,
176    ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
177
178    fn pause(&mut self) -> LogStoreResult<()>;
179
180    fn resume(&mut self) -> LogStoreResult<()>;
181}
182
183pub trait LogReader: Send + Sized + 'static {
184    /// Initialize the log reader. Usually function as waiting for log writer to be initialized.
185    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
186
187    /// Consume log store from given `start_offset` or aligned start offset recorded previously.
188    fn start_from(
189        &mut self,
190        start_offset: Option<u64>,
191    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
192
193    /// Emit the next item.
194    ///
195    /// The implementation should ensure that the future is cancellation safe.
196    fn next_item(
197        &mut self,
198    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
199
200    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
201    /// from the current offset.
202    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
203
204    /// Reset the log reader to after the latest truncate offset
205    ///
206    /// The return flag means whether the log store support rewind
207    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
208}
209
210pub trait LogStoreFactory: Send + 'static {
211    const ALLOW_REWIND: bool;
212    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
213    type Reader: LogReader;
214    type Writer: LogWriter;
215
216    fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
217}
218
219pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
220    f: F,
221    inner: R,
222}
223
224pub struct TruncateBarrierLogReader<R: LogReader> {
225    inner: R,
226    pending_barriers: VecDeque<u64>,
227}
228
229impl<R: LogReader> TruncateBarrierLogReader<R> {
230    pub fn new(inner: R) -> Self {
231        Self {
232            inner,
233            pending_barriers: VecDeque::new(),
234        }
235    }
236}
237
238impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
239    for TransformChunkLogReader<F, R>
240{
241    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
242        self.inner.init()
243    }
244
245    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
246        let (epoch, item) = self.inner.next_item().await?;
247        let item = match item {
248            LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
249                chunk: (self.f)(chunk),
250                chunk_id,
251            },
252            other => other,
253        };
254        Ok((epoch, item))
255    }
256
257    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
258        self.inner.truncate(offset)
259    }
260
261    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
262        self.inner.rewind()
263    }
264
265    fn start_from(
266        &mut self,
267        start_offset: Option<u64>,
268    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
269        self.inner.start_from(start_offset)
270    }
271}
272
273impl<R: LogReader> LogReader for TruncateBarrierLogReader<R> {
274    async fn init(&mut self) -> LogStoreResult<()> {
275        self.pending_barriers.clear();
276        self.inner.init().await
277    }
278
279    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
280        let (epoch, item) = self.inner.next_item().await?;
281        if matches!(item, LogStoreReadItem::Barrier { .. }) {
282            self.pending_barriers.push_back(epoch);
283        }
284        Ok((epoch, item))
285    }
286
287    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
288        while let Some(front_epoch) = self.pending_barriers.front().copied() {
289            match (TruncateOffset::Barrier { epoch: front_epoch }).cmp(&offset) {
290                Ordering::Less => {
291                    self.inner
292                        .truncate(TruncateOffset::Barrier { epoch: front_epoch })?;
293                    self.pending_barriers.pop_front();
294                }
295                Ordering::Equal => {
296                    self.inner.truncate(offset)?;
297                    self.pending_barriers.pop_front();
298                    return Ok(());
299                }
300                Ordering::Greater => {
301                    self.inner.truncate(offset)?;
302                    return Ok(());
303                }
304            }
305        }
306        self.inner.truncate(offset)?;
307        Ok(())
308    }
309
310    async fn rewind(&mut self) -> LogStoreResult<()> {
311        self.pending_barriers.clear();
312        self.inner.rewind().await
313    }
314
315    async fn start_from(&mut self, start_offset: Option<u64>) -> LogStoreResult<()> {
316        self.pending_barriers.clear();
317        self.inner.start_from(start_offset).await
318    }
319}
320
321pub struct BackpressureMonitoredLogReader<R: LogReader> {
322    inner: R,
323    /// Start time to wait for new future after poll ready
324    wait_new_future_start_time: Option<Instant>,
325    wait_new_future_duration_ns: LabelGuardedIntCounter,
326}
327
328impl<R: LogReader> BackpressureMonitoredLogReader<R> {
329    fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
330        Self {
331            inner,
332            wait_new_future_start_time: None,
333            wait_new_future_duration_ns,
334        }
335    }
336}
337
338impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
339    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
340        self.wait_new_future_start_time = None;
341        self.inner.init()
342    }
343
344    fn next_item(
345        &mut self,
346    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
347        if let Some(start_time) = self.wait_new_future_start_time.take() {
348            self.wait_new_future_duration_ns
349                .inc_by(start_time.elapsed().as_nanos() as _);
350        }
351        self.inner.next_item().inspect_ok(|_| {
352            // Set start time when return ready
353            self.wait_new_future_start_time = Some(Instant::now());
354        })
355    }
356
357    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
358        self.inner.truncate(offset)
359    }
360
361    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
362        self.inner.rewind().inspect_ok(|_| {
363            self.wait_new_future_start_time = None;
364        })
365    }
366
367    fn start_from(
368        &mut self,
369        start_offset: Option<u64>,
370    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
371        self.inner.start_from(start_offset)
372    }
373}
374
375pub struct MonitoredLogReader<R: LogReader> {
376    inner: R,
377    read_epoch: u64,
378    metrics: LogReaderMetrics,
379}
380
381pub struct LogReaderMetrics {
382    pub log_store_latest_read_epoch: LabelGuardedIntGauge,
383    pub log_store_read_rows: LabelGuardedIntCounter,
384    pub log_store_read_bytes: LabelGuardedIntCounter,
385    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
386}
387
388impl<R: LogReader> MonitoredLogReader<R> {
389    pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
390        Self {
391            inner,
392            read_epoch: INVALID_EPOCH,
393            metrics,
394        }
395    }
396}
397
398impl<R: LogReader> LogReader for MonitoredLogReader<R> {
399    async fn init(&mut self) -> LogStoreResult<()> {
400        self.inner.init().instrument_await("log_reader_init").await
401    }
402
403    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
404        self.inner
405            .next_item()
406            .instrument_await("log_reader_next_item")
407            .await
408            .inspect(|(epoch, item)| {
409                if self.read_epoch != *epoch {
410                    self.read_epoch = *epoch;
411                    self.metrics.log_store_latest_read_epoch.set(*epoch as _);
412                }
413                if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
414                    self.metrics
415                        .log_store_read_rows
416                        .inc_by(chunk.cardinality() as _);
417                    self.metrics
418                        .log_store_read_bytes
419                        .inc_by(chunk.estimated_size() as u64);
420                }
421            })
422    }
423
424    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
425        self.inner.truncate(offset)
426    }
427
428    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
429        self.inner.rewind().instrument_await("log_reader_rewind")
430    }
431
432    fn start_from(
433        &mut self,
434        start_offset: Option<u64>,
435    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
436        self.inner.start_from(start_offset)
437    }
438}
439
440#[derive(Copy, Clone, PartialOrd, PartialEq, Debug)]
441struct UpstreamChunkOffset(TruncateOffset);
442#[derive(Copy, Clone, PartialOrd, PartialEq)]
443struct DownstreamChunkOffset(TruncateOffset);
444
445struct RateLimitedLogReaderCore<R: LogReader> {
446    inner: R,
447    consuming_chunk: Option<(
448        UpstreamChunkOffset,
449        // Newer items at the front, push_front, pop_back
450        VecDeque<DownstreamChunkOffset>,
451        Vec<StreamChunk>, // split chunks
452    )>,
453    // Newer items at the front, push_front, pop_back
454    consumed_offset_queue: VecDeque<(UpstreamChunkOffset, VecDeque<DownstreamChunkOffset>)>,
455    next_chunk_id: usize,
456    rate_limiter: RateLimiter,
457}
458
459pub struct RateLimitedLogReader<R: LogReader> {
460    core: RateLimitedLogReaderCore<R>,
461    control_rx: UnboundedReceiver<RateLimit>,
462}
463
464impl<R: LogReader> RateLimitedLogReader<R> {
465    pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
466        Self {
467            core: RateLimitedLogReaderCore {
468                inner,
469                consuming_chunk: None,
470                consumed_offset_queue: VecDeque::new(),
471                next_chunk_id: 0,
472                rate_limiter: RateLimiter::new(RateLimit::Disabled),
473            },
474            control_rx,
475        }
476    }
477}
478
479impl<R: LogReader> RateLimitedLogReaderCore<R> {
480    fn peek_next_pending_chunk(&self) -> Option<&StreamChunk> {
481        self.consuming_chunk
482            .as_ref()
483            .and_then(|(_, _, chunk)| chunk.last())
484    }
485
486    fn consume_next_pending_chunk(&mut self) -> Option<(u64, StreamChunk, ChunkId)> {
487        let Some((upstream_offset, consumed_offsets, pending_chunk)) = &mut self.consuming_chunk
488        else {
489            return None;
490        };
491        let epoch = upstream_offset.0.epoch();
492
493        let item = pending_chunk.pop().map(|chunk| {
494            let chunk_id = self.next_chunk_id;
495            self.next_chunk_id += 1;
496            consumed_offsets.push_front(DownstreamChunkOffset(TruncateOffset::Chunk {
497                epoch,
498                chunk_id,
499            }));
500            (epoch, chunk, chunk_id)
501        });
502        if pending_chunk.is_empty() {
503            let (upstream_offset, consumed_offsets, _) =
504                self.consuming_chunk.take().expect("checked some");
505            self.consumed_offset_queue
506                .push_front((upstream_offset, consumed_offsets));
507        }
508        item
509    }
510
511    fn consume_single_upstream_item(
512        &mut self,
513        epoch: u64,
514        mut item: LogStoreReadItem,
515    ) -> (u64, LogStoreReadItem) {
516        assert!(self.consuming_chunk.is_none());
517        let (upstream_offset, downstream_offset) = match &mut item {
518            LogStoreReadItem::StreamChunk { chunk_id, .. } => {
519                let upstream_chunk_id = *chunk_id;
520                let downstream_chunk_id = self.next_chunk_id;
521                self.next_chunk_id += 1;
522                *chunk_id = downstream_chunk_id;
523                (
524                    UpstreamChunkOffset(TruncateOffset::Chunk {
525                        epoch,
526                        chunk_id: upstream_chunk_id,
527                    }),
528                    DownstreamChunkOffset(TruncateOffset::Chunk {
529                        epoch,
530                        chunk_id: downstream_chunk_id,
531                    }),
532                )
533            }
534            LogStoreReadItem::Barrier { .. } => (
535                UpstreamChunkOffset(TruncateOffset::Barrier { epoch }),
536                DownstreamChunkOffset(TruncateOffset::Barrier { epoch }),
537            ),
538        };
539        self.consumed_offset_queue
540            .push_front((upstream_offset, VecDeque::from_iter([downstream_offset])));
541        (epoch, item)
542    }
543
544    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
545        match self.rate_limiter.rate_limit() {
546            RateLimit::Pause => pending().await,
547            RateLimit::Disabled => {
548                if let Some((epoch, chunk, chunk_id)) = self.consume_next_pending_chunk() {
549                    Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
550                } else {
551                    let (epoch, item) = self.inner.next_item().await?;
552                    Ok(self.consume_single_upstream_item(epoch, item))
553                }
554            }
555            RateLimit::Fixed(limit) => {
556                if self.peek_next_pending_chunk().is_none() {
557                    let (epoch, item) = self.inner.next_item().await?;
558                    match item {
559                        LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
560                            let chunks = if chunk.rate_limit_permits() < limit.get() {
561                                vec![chunk]
562                            } else {
563                                let mut chunks = chunk.split(limit.get() as _);
564                                // reverse to make the first chunk to be popped first
565                                chunks.reverse();
566                                chunks
567                            };
568                            assert!(!chunks.is_empty());
569
570                            assert!(
571                                self.consuming_chunk
572                                    .replace((
573                                        UpstreamChunkOffset(TruncateOffset::Chunk {
574                                            epoch,
575                                            chunk_id
576                                        }),
577                                        VecDeque::new(),
578                                        chunks,
579                                    ))
580                                    .is_none()
581                            );
582                        }
583                        item @ LogStoreReadItem::Barrier { .. } => {
584                            return Ok(self.consume_single_upstream_item(epoch, item));
585                        }
586                    };
587                }
588                let chunk = self.peek_next_pending_chunk().expect("must Some");
589                self.rate_limiter.wait_chunk(chunk).await;
590                let (epoch, chunk, chunk_id) =
591                    self.consume_next_pending_chunk().expect("must Some");
592                Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
593            }
594        }
595    }
596}
597
598impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
599    async fn init(&mut self) -> LogStoreResult<()> {
600        self.core.inner.init().await
601    }
602
603    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
604        loop {
605            select! {
606                biased;
607                recv = pin!(self.control_rx.recv()) => {
608                    let new_rate_limit = match recv {
609                        Some(limit) => limit,
610                        None => bail!("rate limit control channel closed"),
611                    };
612                    let old_rate_limit = self.core.rate_limiter.update(new_rate_limit);
613                    let paused = matches!(new_rate_limit, RateLimit::Pause);
614                    tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
615                },
616                item = self.core.next_item() => {
617                    return item;
618                }
619            }
620        }
621    }
622
623    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
624        let downstream_offset = DownstreamChunkOffset(offset);
625        let mut truncate_offset = None;
626        let mut stop = false;
627        'outer: while let Some((upstream_offset, downstream_offsets)) =
628            self.core.consumed_offset_queue.back_mut()
629        {
630            while let Some(prev_downstream_offset) = downstream_offsets.back() {
631                if *prev_downstream_offset <= downstream_offset {
632                    downstream_offsets.pop_back();
633                } else {
634                    stop = true;
635                    break 'outer;
636                }
637            }
638            truncate_offset = Some(*upstream_offset);
639            self.core.consumed_offset_queue.pop_back();
640        }
641        if !stop && let Some((_, downstream_offsets, _)) = &mut self.core.consuming_chunk {
642            while let Some(prev_downstream_offset) = downstream_offsets.back() {
643                if *prev_downstream_offset <= downstream_offset {
644                    downstream_offsets.pop_back();
645                } else {
646                    // stop = true;
647                    break;
648                }
649            }
650        }
651        tracing::trace!(
652            "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
653            truncate_offset,
654            offset
655        );
656        if let Some(offset) = truncate_offset {
657            self.core.inner.truncate(offset.0)
658        } else {
659            Ok(())
660        }
661    }
662
663    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
664        self.core.consuming_chunk = None;
665        self.core.consumed_offset_queue.clear();
666        self.core.next_chunk_id = 0;
667        self.core.inner.rewind()
668    }
669
670    fn start_from(
671        &mut self,
672        start_offset: Option<u64>,
673    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
674        self.core.inner.start_from(start_offset)
675    }
676}
677
678#[easy_ext::ext(LogReaderExt)]
679impl<T> T
680where
681    T: LogReader,
682{
683    pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
684        self,
685        f: F,
686    ) -> TransformChunkLogReader<F, Self> {
687        TransformChunkLogReader { f, inner: self }
688    }
689
690    pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
691        // TODO: The `clone()` can be avoided if move backpressure inside `MonitoredLogReader`
692        let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
693        BackpressureMonitoredLogReader::new(
694            MonitoredLogReader::new(self, metrics),
695            wait_new_future_duration,
696        )
697    }
698
699    pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
700        RateLimitedLogReader::new(self, control_rx)
701    }
702}
703
704pub struct MonitoredLogWriter<W: LogWriter> {
705    inner: W,
706    metrics: LogWriterMetrics,
707}
708
709pub struct LogWriterMetrics {
710    // Labels: [actor_id, sink_id, sink_name]
711    pub log_store_first_write_epoch: LabelGuardedIntGauge,
712    pub log_store_latest_write_epoch: LabelGuardedIntGauge,
713    pub log_store_write_rows: LabelGuardedIntCounter,
714}
715
716impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
717    async fn init(
718        &mut self,
719        epoch: EpochPair,
720        pause_read_on_bootstrap: bool,
721    ) -> LogStoreResult<()> {
722        self.metrics
723            .log_store_first_write_epoch
724            .set(epoch.curr as _);
725        self.metrics
726            .log_store_latest_write_epoch
727            .set(epoch.curr as _);
728        self.inner.init(epoch, pause_read_on_bootstrap).await
729    }
730
731    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
732        self.metrics
733            .log_store_write_rows
734            .inc_by(chunk.cardinality() as _);
735        self.inner.write_chunk(chunk).await
736    }
737
738    async fn flush_current_epoch(
739        &mut self,
740        next_epoch: u64,
741        options: FlushCurrentEpochOptions,
742    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
743        let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
744        self.metrics
745            .log_store_latest_write_epoch
746            .set(next_epoch as _);
747        Ok(post_flush)
748    }
749
750    fn pause(&mut self) -> LogStoreResult<()> {
751        self.inner.pause()
752    }
753
754    fn resume(&mut self) -> LogStoreResult<()> {
755        self.inner.resume()
756    }
757}
758
759#[easy_ext::ext(LogWriterExt)]
760impl<T> T
761where
762    T: LogWriter + Sized,
763{
764    pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
765        MonitoredLogWriter {
766            inner: self,
767            metrics,
768        }
769    }
770}
771
772enum DeliveryFutureManagerItem<F> {
773    Chunk {
774        chunk_id: ChunkId,
775        // earlier future at the front
776        futures: VecDeque<F>,
777    },
778    Barrier,
779}
780
781pub struct DeliveryFutureManager<F> {
782    future_count: usize,
783    max_future_count: usize,
784    // earlier items at the front
785    items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
786}
787
788impl<F> DeliveryFutureManager<F> {
789    pub fn new(max_future_count: usize) -> Self {
790        Self {
791            future_count: 0,
792            max_future_count,
793            items: Default::default(),
794        }
795    }
796
797    pub fn add_barrier(&mut self, epoch: u64) {
798        if let Some((item_epoch, last_item)) = self.items.back() {
799            match last_item {
800                DeliveryFutureManagerItem::Chunk { .. } => {
801                    assert_eq!(*item_epoch, epoch)
802                }
803                DeliveryFutureManagerItem::Barrier => {
804                    assert!(
805                        epoch > *item_epoch,
806                        "new barrier epoch {} should be greater than prev barrier {}",
807                        epoch,
808                        item_epoch
809                    );
810                }
811            }
812        }
813        self.items
814            .push_back((epoch, DeliveryFutureManagerItem::Barrier));
815    }
816
817    pub fn start_write_chunk(
818        &mut self,
819        epoch: u64,
820        chunk_id: ChunkId,
821    ) -> DeliveryFutureManagerAddFuture<'_, F> {
822        if let Some((item_epoch, item)) = self.items.back() {
823            match item {
824                DeliveryFutureManagerItem::Chunk {
825                    chunk_id: item_chunk_id,
826                    ..
827                } => {
828                    assert_eq!(epoch, *item_epoch);
829                    assert!(
830                        chunk_id > *item_chunk_id,
831                        "new chunk id {} should be greater than prev chunk id {}",
832                        chunk_id,
833                        item_chunk_id
834                    );
835                }
836                DeliveryFutureManagerItem::Barrier => {
837                    assert!(
838                        epoch > *item_epoch,
839                        "new chunk epoch {} should be greater than prev barrier: {}",
840                        epoch,
841                        item_epoch
842                    );
843                }
844            }
845        }
846        self.items.push_back((
847            epoch,
848            DeliveryFutureManagerItem::Chunk {
849                chunk_id,
850                futures: VecDeque::new(),
851            },
852        ));
853        DeliveryFutureManagerAddFuture(self)
854    }
855}
856
857pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
858
859impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
860    /// Add a new future to the latest started written chunk.
861    /// The returned bool value indicate whether we have awaited on any previous futures.
862    pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
863        let mut has_await = false;
864        while self.0.future_count >= self.0.max_future_count {
865            self.await_one_delivery().await?;
866            has_await = true;
867        }
868        match self.0.items.back_mut() {
869            Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
870                futures.push_back(future);
871                self.0.future_count += 1;
872                Ok(has_await)
873            }
874            _ => unreachable!("should add future only after add a new chunk"),
875        }
876    }
877
878    pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
879        for (_, item) in &mut self.0.items {
880            if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
881                && let Some(mut delivery_future) = futures.pop_front()
882            {
883                self.0.future_count -= 1;
884                return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
885            } else {
886                continue;
887            }
888        }
889        Ok(())
890    }
891
892    pub fn future_count(&self) -> usize {
893        self.0.future_count
894    }
895
896    pub fn max_future_count(&self) -> usize {
897        self.0.max_future_count
898    }
899}
900
901impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
902    pub fn next_truncate_offset(
903        &mut self,
904    ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
905        poll_fn(move |cx| {
906            let mut latest_offset: Option<TruncateOffset> = None;
907            'outer: while let Some((epoch, item)) = self.items.front_mut() {
908                match item {
909                    DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
910                        while let Some(future) = futures.front_mut() {
911                            match future.try_poll_unpin(cx) {
912                                Poll::Ready(result) => match result {
913                                    Ok(()) => {
914                                        self.future_count -= 1;
915                                        futures.pop_front();
916                                    }
917                                    Err(e) => {
918                                        return Poll::Ready(Err(e));
919                                    }
920                                },
921                                Poll::Pending => {
922                                    break 'outer;
923                                }
924                            }
925                        }
926
927                        // when we reach here, there must not be any pending or error future.
928                        // Which means all futures of this stream chunk have been finished
929                        assert!(futures.is_empty());
930                        latest_offset = Some(TruncateOffset::Chunk {
931                            epoch: *epoch,
932                            chunk_id: *chunk_id,
933                        });
934                        self.items.pop_front().expect("items not empty");
935                    }
936                    DeliveryFutureManagerItem::Barrier => {
937                        latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
938                        self.items.pop_front().expect("items not empty");
939                        // Barrier will be yielded anyway
940                        break 'outer;
941                    }
942                }
943            }
944            if let Some(offset) = latest_offset {
945                Poll::Ready(Ok(offset))
946            } else {
947                Poll::Pending
948            }
949        })
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use std::collections::VecDeque;
956    use std::future::{Future, poll_fn};
957    use std::pin::pin;
958    use std::task::Poll;
959
960    use futures::{FutureExt, TryFuture};
961    use risingwave_common::array::StreamChunk;
962    use risingwave_common::util::epoch::test_epoch;
963    use tokio::sync::oneshot;
964    use tokio::sync::oneshot::Receiver;
965
966    use super::{LogReader, LogStoreReadItem, LogStoreResult, TruncateBarrierLogReader};
967    use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
968
969    struct MockLogReader {
970        items: VecDeque<(u64, LogStoreReadItem)>,
971        truncate_calls: Vec<TruncateOffset>,
972    }
973
974    impl MockLogReader {
975        fn new(items: impl IntoIterator<Item = (u64, LogStoreReadItem)>) -> Self {
976            Self {
977                items: items.into_iter().collect(),
978                truncate_calls: Vec::new(),
979            }
980        }
981    }
982
983    impl LogReader for MockLogReader {
984        async fn init(&mut self) -> LogStoreResult<()> {
985            Ok(())
986        }
987
988        async fn start_from(&mut self, _start_offset: Option<u64>) -> LogStoreResult<()> {
989            Ok(())
990        }
991
992        async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
993            self.items
994                .pop_front()
995                .ok_or_else(|| anyhow::anyhow!("no item"))
996        }
997
998        fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
999            self.truncate_calls.push(offset);
1000            Ok(())
1001        }
1002
1003        async fn rewind(&mut self) -> LogStoreResult<()> {
1004            Ok(())
1005        }
1006    }
1007
1008    #[test]
1009    fn test_truncate_offset_cmp() {
1010        assert!(
1011            TruncateOffset::Barrier { epoch: 232 }
1012                < TruncateOffset::Chunk {
1013                    epoch: 233,
1014                    chunk_id: 1
1015                }
1016        );
1017        assert_eq!(
1018            TruncateOffset::Chunk {
1019                epoch: 1,
1020                chunk_id: 1
1021            },
1022            TruncateOffset::Chunk {
1023                epoch: 1,
1024                chunk_id: 1
1025            }
1026        );
1027        assert!(
1028            TruncateOffset::Chunk {
1029                epoch: 1,
1030                chunk_id: 1
1031            } < TruncateOffset::Chunk {
1032                epoch: 1,
1033                chunk_id: 2
1034            }
1035        );
1036        assert!(
1037            TruncateOffset::Barrier { epoch: 1 }
1038                > TruncateOffset::Chunk {
1039                    epoch: 1,
1040                    chunk_id: 2
1041                }
1042        );
1043        assert!(
1044            TruncateOffset::Chunk {
1045                epoch: 1,
1046                chunk_id: 2
1047            } < TruncateOffset::Barrier { epoch: 1 }
1048        );
1049        assert!(
1050            TruncateOffset::Chunk {
1051                epoch: 2,
1052                chunk_id: 2
1053            } > TruncateOffset::Barrier { epoch: 1 }
1054        );
1055        assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
1056    }
1057
1058    type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
1059
1060    #[define_opaque(TestFuture)]
1061    fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
1062        async move { rx.await.unwrap() }.boxed()
1063    }
1064
1065    #[tokio::test]
1066    async fn test_empty() {
1067        let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
1068        let mut future = pin!(manager.next_truncate_offset());
1069        assert!(
1070            poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1071                .await
1072                .is_pending()
1073        );
1074    }
1075
1076    #[tokio::test]
1077    async fn test_future_delivery_manager_basic() {
1078        let mut manager = DeliveryFutureManager::new(2);
1079        let epoch1 = 233;
1080        let chunk_id1 = 1;
1081        let (tx1_1, rx1_1) = oneshot::channel();
1082        let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
1083        assert!(
1084            !write_chunk
1085                .add_future_may_await(to_test_future(rx1_1))
1086                .await
1087                .unwrap()
1088        );
1089        assert_eq!(manager.future_count, 1);
1090        {
1091            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1092            assert!(
1093                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1094                    .await
1095                    .is_pending()
1096            );
1097            tx1_1.send(Ok(())).unwrap();
1098            assert_eq!(
1099                next_truncate_offset.await.unwrap(),
1100                TruncateOffset::Chunk {
1101                    epoch: epoch1,
1102                    chunk_id: chunk_id1
1103                }
1104            );
1105        }
1106        assert_eq!(manager.future_count, 0);
1107        manager.add_barrier(epoch1);
1108        assert_eq!(
1109            manager.next_truncate_offset().await.unwrap(),
1110            TruncateOffset::Barrier { epoch: epoch1 }
1111        );
1112    }
1113
1114    #[tokio::test]
1115    async fn test_future_delivery_manager_compress_chunk() {
1116        let mut manager = DeliveryFutureManager::new(10);
1117        let epoch1 = test_epoch(233);
1118        let chunk_id1 = 1;
1119        let chunk_id2 = chunk_id1 + 1;
1120        let chunk_id3 = chunk_id2 + 1;
1121        let (tx1_1, rx1_1) = oneshot::channel();
1122        let (tx1_2, rx1_2) = oneshot::channel();
1123        let (tx1_3, rx1_3) = oneshot::channel();
1124        let epoch2 = test_epoch(234);
1125        let (tx2_1, rx2_1) = oneshot::channel();
1126        assert!(
1127            !manager
1128                .start_write_chunk(epoch1, chunk_id1)
1129                .add_future_may_await(to_test_future(rx1_1))
1130                .await
1131                .unwrap()
1132        );
1133        assert!(
1134            !manager
1135                .start_write_chunk(epoch1, chunk_id2)
1136                .add_future_may_await(to_test_future(rx1_2))
1137                .await
1138                .unwrap()
1139        );
1140        assert!(
1141            !manager
1142                .start_write_chunk(epoch1, chunk_id3)
1143                .add_future_may_await(to_test_future(rx1_3))
1144                .await
1145                .unwrap()
1146        );
1147        manager.add_barrier(epoch1);
1148        assert!(
1149            !manager
1150                .start_write_chunk(epoch2, chunk_id1)
1151                .add_future_may_await(to_test_future(rx2_1))
1152                .await
1153                .unwrap()
1154        );
1155        assert_eq!(manager.future_count, 4);
1156        {
1157            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1158            assert!(
1159                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1160                    .await
1161                    .is_pending()
1162            );
1163            tx1_2.send(Ok(())).unwrap();
1164            assert!(
1165                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1166                    .await
1167                    .is_pending()
1168            );
1169            tx1_1.send(Ok(())).unwrap();
1170            // The offset of chunk1 and chunk2 are compressed
1171            assert_eq!(
1172                next_truncate_offset.await.unwrap(),
1173                TruncateOffset::Chunk {
1174                    epoch: epoch1,
1175                    chunk_id: chunk_id2
1176                }
1177            );
1178        }
1179        assert_eq!(manager.future_count, 2);
1180        {
1181            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1182            assert!(
1183                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1184                    .await
1185                    .is_pending()
1186            );
1187            tx1_3.send(Ok(())).unwrap();
1188            tx2_1.send(Ok(())).unwrap();
1189            // Emit barrier though later chunk has finished.
1190            assert_eq!(
1191                next_truncate_offset.await.unwrap(),
1192                TruncateOffset::Barrier { epoch: epoch1 }
1193            );
1194        }
1195        assert_eq!(manager.future_count, 1);
1196        assert_eq!(
1197            manager.next_truncate_offset().await.unwrap(),
1198            TruncateOffset::Chunk {
1199                epoch: epoch2,
1200                chunk_id: chunk_id1
1201            }
1202        );
1203    }
1204
1205    #[tokio::test]
1206    async fn test_future_delivery_manager_await_future() {
1207        let mut manager = DeliveryFutureManager::new(2);
1208        let epoch = 233;
1209        let chunk_id1 = 1;
1210        let chunk_id2 = chunk_id1 + 1;
1211        let (tx1_1, rx1_1) = oneshot::channel();
1212        let (tx1_2, rx1_2) = oneshot::channel();
1213        let (tx2_1, rx2_1) = oneshot::channel();
1214        let (tx2_2, rx2_2) = oneshot::channel();
1215
1216        {
1217            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1218            assert!(
1219                !write_chunk
1220                    .add_future_may_await(to_test_future(rx1_1))
1221                    .await
1222                    .unwrap()
1223            );
1224            assert!(
1225                !write_chunk
1226                    .add_future_may_await(to_test_future(rx1_2))
1227                    .await
1228                    .unwrap()
1229            );
1230            assert_eq!(manager.future_count, 2);
1231        }
1232
1233        {
1234            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1235            {
1236                let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1237                assert!(
1238                    poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1239                        .await
1240                        .is_pending()
1241                );
1242                tx1_1.send(Ok(())).unwrap();
1243                assert!(future1.await.unwrap());
1244            }
1245            assert_eq!(2, write_chunk.future_count());
1246            {
1247                let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1248                assert!(
1249                    poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1250                        .await
1251                        .is_pending()
1252                );
1253                tx1_2.send(Ok(())).unwrap();
1254                assert!(future2.await.unwrap());
1255            }
1256            assert_eq!(2, write_chunk.future_count());
1257            {
1258                let mut future3 = pin!(write_chunk.await_one_delivery());
1259                assert!(
1260                    poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1261                        .await
1262                        .is_pending()
1263                );
1264                tx2_1.send(Ok(())).unwrap();
1265                future3.await.unwrap();
1266            }
1267            assert_eq!(1, write_chunk.future_count());
1268        }
1269
1270        assert_eq!(
1271            manager.next_truncate_offset().await.unwrap(),
1272            TruncateOffset::Chunk {
1273                epoch,
1274                chunk_id: chunk_id1
1275            }
1276        );
1277
1278        assert_eq!(1, manager.future_count);
1279
1280        {
1281            let mut future = pin!(manager.next_truncate_offset());
1282            assert!(
1283                poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1284                    .await
1285                    .is_pending()
1286            );
1287            tx2_2.send(Ok(())).unwrap();
1288            assert_eq!(
1289                future.await.unwrap(),
1290                TruncateOffset::Chunk {
1291                    epoch,
1292                    chunk_id: chunk_id2
1293                }
1294            );
1295        }
1296
1297        assert_eq!(0, manager.future_count);
1298    }
1299
1300    #[tokio::test]
1301    async fn test_truncate_barrier_reader_truncates_previous_barriers_first() {
1302        let inner = MockLogReader::new([
1303            (
1304                1,
1305                LogStoreReadItem::Barrier {
1306                    is_checkpoint: false,
1307                    new_vnode_bitmap: None,
1308                    is_stop: false,
1309                    schema_change: None,
1310                },
1311            ),
1312            (
1313                2,
1314                LogStoreReadItem::Barrier {
1315                    is_checkpoint: false,
1316                    new_vnode_bitmap: None,
1317                    is_stop: false,
1318                    schema_change: None,
1319                },
1320            ),
1321        ]);
1322        let mut reader = TruncateBarrierLogReader::new(inner);
1323
1324        reader.init().await.unwrap();
1325        reader.start_from(None).await.unwrap();
1326        reader.next_item().await.unwrap();
1327        reader.next_item().await.unwrap();
1328        reader
1329            .truncate(TruncateOffset::Barrier { epoch: 2 })
1330            .unwrap();
1331
1332        assert_eq!(
1333            reader.inner.truncate_calls,
1334            vec![
1335                TruncateOffset::Barrier { epoch: 1 },
1336                TruncateOffset::Barrier { epoch: 2 },
1337            ]
1338        );
1339    }
1340
1341    #[tokio::test]
1342    async fn test_truncate_barrier_reader_keeps_later_barrier_pending() {
1343        let inner = MockLogReader::new([
1344            (
1345                1,
1346                LogStoreReadItem::Barrier {
1347                    is_checkpoint: false,
1348                    new_vnode_bitmap: None,
1349                    is_stop: false,
1350                    schema_change: None,
1351                },
1352            ),
1353            (
1354                2,
1355                LogStoreReadItem::StreamChunk {
1356                    chunk: StreamChunk::default(),
1357                    chunk_id: 0,
1358                },
1359            ),
1360            (
1361                2,
1362                LogStoreReadItem::Barrier {
1363                    is_checkpoint: false,
1364                    new_vnode_bitmap: None,
1365                    is_stop: false,
1366                    schema_change: None,
1367                },
1368            ),
1369        ]);
1370        let mut reader = TruncateBarrierLogReader::new(inner);
1371
1372        reader.init().await.unwrap();
1373        reader.start_from(None).await.unwrap();
1374        reader.next_item().await.unwrap();
1375        reader.next_item().await.unwrap();
1376        reader.next_item().await.unwrap();
1377        reader
1378            .truncate(TruncateOffset::Chunk {
1379                epoch: 2,
1380                chunk_id: 0,
1381            })
1382            .unwrap();
1383
1384        assert_eq!(
1385            reader.inner.truncate_calls,
1386            vec![
1387                TruncateOffset::Barrier { epoch: 1 },
1388                TruncateOffset::Chunk {
1389                    epoch: 2,
1390                    chunk_id: 0,
1391                },
1392            ]
1393        );
1394
1395        reader
1396            .truncate(TruncateOffset::Barrier { epoch: 2 })
1397            .unwrap();
1398        assert_eq!(
1399            reader.inner.truncate_calls,
1400            vec![
1401                TruncateOffset::Barrier { epoch: 1 },
1402                TruncateOffset::Chunk {
1403                    epoch: 2,
1404                    chunk_id: 0,
1405                },
1406                TruncateOffset::Barrier { epoch: 2 },
1407            ]
1408        );
1409    }
1410}