risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::ColumnId;
25use risingwave_common::id::SourceId;
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
27use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::JsonbVal;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::{
33    BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
34    SplitMetaData,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_storage::store::TryWaitEpochOptions;
38use serde::{Deserialize, Serialize};
39use thiserror_ext::AsReport;
40
41use super::executor_core::StreamSourceCore;
42use super::source_backfill_state_table::BackfillStateTableHandler;
43use super::{apply_rate_limit, get_split_offset_col_idx};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::UpdateMutation;
46use crate::executor::prelude::*;
47use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
48use crate::task::CreateMviewProgressReporter;
49
50#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
51pub enum BackfillState {
52    /// `None` means not started yet. It's the initial state.
53    /// XXX: perhaps we can also set to low-watermark instead of `None`
54    Backfilling(Option<String>),
55    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
56    SourceCachingUp(String),
57    Finished,
58}
59pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
60
61/// Only `state` field is the real state for fail-over.
62/// Other fields are for observability (but we still need to persist them).
63#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
64pub struct BackfillStateWithProgress {
65    pub state: BackfillState,
66    pub num_consumed_rows: u64,
67    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
68    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
69    /// so that we can finish backfilling even when upstream doesn't emit any data.
70    pub target_offset: Option<String>,
71}
72
73impl BackfillStateWithProgress {
74    pub fn encode_to_json(self) -> JsonbVal {
75        serde_json::to_value(self).unwrap().into()
76    }
77
78    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
79        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
80    }
81}
82
83pub struct SourceBackfillExecutor<S: StateStore> {
84    pub inner: SourceBackfillExecutorInner<S>,
85    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
86    pub input: Executor,
87}
88
89pub struct SourceBackfillExecutorInner<S: StateStore> {
90    actor_ctx: ActorContextRef,
91    info: ExecutorInfo,
92
93    /// Streaming source for external
94    source_id: SourceId,
95    source_name: String,
96    column_ids: Vec<ColumnId>,
97    source_desc_builder: Option<SourceDescBuilder>,
98    backfill_state_store: BackfillStateTableHandler<S>,
99
100    /// Metrics for monitor.
101    metrics: Arc<StreamingMetrics>,
102    source_split_change_count: LabelGuardedIntCounter,
103
104    // /// Receiver of barrier channel.
105    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
106    /// System parameter reader to read barrier interval
107    system_params: SystemParamsReaderRef,
108
109    /// Rate limit in rows/s.
110    rate_limit_rps: Option<u32>,
111
112    progress: CreateMviewProgressReporter,
113}
114
115/// Local variables used in the backfill stage.
116///
117/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
118///
119/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
120#[derive(Debug)]
121struct BackfillStage {
122    states: BackfillStates,
123    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
124    ///
125    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
126    splits: Vec<SplitImpl>,
127}
128
129enum ApplyMutationAfterBarrier {
130    SourceChangeSplit {
131        target_splits: Vec<SplitImpl>,
132        should_trim_state: bool,
133    },
134    ConnectorPropsChange,
135}
136
137impl BackfillStage {
138    fn total_backfilled_rows(&self) -> u64 {
139        self.states.values().map(|s| s.num_consumed_rows).sum()
140    }
141
142    fn debug_assert_consistent(&self) {
143        if cfg!(debug_assertions) {
144            let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
145            assert_eq!(
146                self.states.keys().cloned().collect::<HashSet<_>>(),
147                all_splits
148            );
149        }
150    }
151
152    /// Get unfinished splits with latest offsets according to the backfill states.
153    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
154        let mut unfinished_splits = Vec::new();
155        for split in &self.splits {
156            let state = &self.states.get(split.id().as_ref()).unwrap().state;
157            match state {
158                BackfillState::Backfilling(Some(offset)) => {
159                    let mut updated_split = split.clone();
160                    updated_split.update_in_place(offset.clone())?;
161                    unfinished_splits.push(updated_split);
162                }
163                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
164                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
165            }
166        }
167        Ok(unfinished_splits)
168    }
169
170    /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
171    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
172        let mut vis = false;
173        let state = self.states.get_mut(split_id).unwrap();
174        let state_inner = &mut state.state;
175        match state_inner {
176            BackfillState::Backfilling(None) => {
177                // backfilling for this split is not started yet. Ignore this row
178            }
179            BackfillState::Backfilling(Some(backfill_offset)) => {
180                match compare_kafka_offset(backfill_offset, offset) {
181                    Ordering::Less => {
182                        // continue backfilling. Ignore this row
183                    }
184                    Ordering::Equal => {
185                        // backfilling for this split is finished just right.
186                        *state_inner = BackfillState::Finished;
187                    }
188                    Ordering::Greater => {
189                        // backfilling for this split produced more data than current source's progress.
190                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
191                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
192                    }
193                }
194            }
195            BackfillState::SourceCachingUp(backfill_offset) => {
196                match compare_kafka_offset(backfill_offset, offset) {
197                    Ordering::Less => {
198                        // Source caught up, but doesn't contain the last backfilled row.
199                        // This may happen e.g., if Kafka performed compaction.
200                        vis = true;
201                        *state_inner = BackfillState::Finished;
202                    }
203                    Ordering::Equal => {
204                        // Source just caught up with backfilling.
205                        *state_inner = BackfillState::Finished;
206                    }
207                    Ordering::Greater => {
208                        // Source is still behind backfilling.
209                    }
210                }
211            }
212            BackfillState::Finished => {
213                vis = true;
214                // This split's backfilling is finished, we are waiting for other splits
215            }
216        }
217        if matches!(state_inner, BackfillState::Backfilling(_)) {
218            state.target_offset = Some(offset.to_owned());
219        }
220        if vis {
221            debug_assert_eq!(*state_inner, BackfillState::Finished);
222        }
223        vis
224    }
225
226    /// Updates backfill states and returns whether the row backfilled from external system is visible.
227    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
228        let state = self.states.get_mut(split_id).unwrap();
229        state.num_consumed_rows += 1;
230        let state_inner = &mut state.state;
231        match state_inner {
232            BackfillState::Backfilling(_old_offset) => {
233                if let Some(target_offset) = &state.target_offset
234                    && compare_kafka_offset(offset, target_offset).is_ge()
235                {
236                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
237                    // and dropping duplicated messages.
238                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
239                    // In this case, upstream hasn't reached the target_offset yet.
240                    //
241                    // Note2: after this, all following rows in the current chunk will be invisible.
242                    //
243                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
244                    // keep backfilling.
245                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
246                } else {
247                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
248                }
249                true
250            }
251            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
252                // backfilling stopped. ignore
253                false
254            }
255        }
256    }
257}
258
259impl<S: StateStore> SourceBackfillExecutorInner<S> {
260    #[expect(clippy::too_many_arguments)]
261    pub fn new(
262        actor_ctx: ActorContextRef,
263        info: ExecutorInfo,
264        stream_source_core: StreamSourceCore<S>,
265        metrics: Arc<StreamingMetrics>,
266        system_params: SystemParamsReaderRef,
267        backfill_state_store: BackfillStateTableHandler<S>,
268        rate_limit_rps: Option<u32>,
269        progress: CreateMviewProgressReporter,
270    ) -> Self {
271        let source_split_change_count = metrics
272            .source_split_change_count
273            .with_guarded_label_values(&[
274                &stream_source_core.source_id.to_string(),
275                &stream_source_core.source_name,
276                &actor_ctx.id.to_string(),
277                &actor_ctx.fragment_id.to_string(),
278            ]);
279
280        Self {
281            actor_ctx,
282            info,
283            source_id: stream_source_core.source_id,
284            source_name: stream_source_core.source_name,
285            column_ids: stream_source_core.column_ids,
286            source_desc_builder: stream_source_core.source_desc_builder,
287            backfill_state_store,
288            metrics,
289            source_split_change_count,
290            system_params,
291            rate_limit_rps,
292            progress,
293        }
294    }
295
296    async fn build_stream_source_reader(
297        &self,
298        source_desc: &SourceDesc,
299        splits: Vec<SplitImpl>,
300    ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
301        let column_ids = source_desc
302            .columns
303            .iter()
304            .map(|column_desc| column_desc.column_id)
305            .collect_vec();
306        let source_ctx = SourceContext::new(
307            self.actor_ctx.id,
308            self.source_id,
309            self.actor_ctx.fragment_id,
310            self.source_name.clone(),
311            source_desc.metrics.clone(),
312            SourceCtrlOpts {
313                chunk_size: limited_chunk_size(self.rate_limit_rps),
314                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
315            },
316            source_desc.source.config.clone(),
317            None,
318        );
319
320        // We will check watermark to decide whether we need to backfill.
321        // e.g., when there's a Kafka topic-partition without any data,
322        // we don't need to backfill at all. But if we do not check here,
323        // the executor can only know it's finished when data coming in.
324        // For blocking DDL, this would be annoying.
325
326        let (stream, res) = source_desc
327            .source
328            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
329            .await
330            .map_err(StreamExecutorError::connector_error)?;
331        Ok((
332            apply_rate_limit(stream, self.rate_limit_rps).boxed(),
333            res.backfill_info,
334        ))
335    }
336
337    #[try_stream(ok = Message, error = StreamExecutorError)]
338    async fn execute(mut self, input: Executor) {
339        let mut input = input.execute();
340
341        // Poll the upstream to get the first barrier.
342        let barrier = expect_first_barrier(&mut input).await?;
343        let first_epoch = barrier.epoch;
344        let owned_splits = barrier
345            .initial_split_assignment(self.actor_ctx.id)
346            .unwrap_or(&[])
347            .to_vec();
348
349        let mut pause_control = PauseControl::new();
350        if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
351            pause_control.backfill_pause();
352        }
353        if barrier.is_pause_on_startup() {
354            pause_control.command_pause();
355        }
356        yield Message::Barrier(barrier);
357
358        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
359        let mut source_desc = source_desc_builder
360            .build()
361            .map_err(StreamExecutorError::connector_error)?;
362
363        // source backfill only applies to kafka, so we don't need to get pulsar's `message_id_data_idx`.
364        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
365        else {
366            unreachable!("Partition and offset columns must be set.");
367        };
368
369        self.backfill_state_store.init_epoch(first_epoch).await?;
370
371        let mut backfill_states: BackfillStates = HashMap::new();
372        {
373            let committed_reader = self
374                .backfill_state_store
375                .new_committed_reader(first_epoch)
376                .await?;
377            for split in &owned_splits {
378                let split_id = split.id();
379                let backfill_state = committed_reader
380                    .try_recover_from_state_store(&split_id)
381                    .await?
382                    .unwrap_or(BackfillStateWithProgress {
383                        state: BackfillState::Backfilling(None),
384                        num_consumed_rows: 0,
385                        target_offset: None, // init with None
386                    });
387                backfill_states.insert(split_id, backfill_state);
388            }
389        }
390        let mut backfill_stage = BackfillStage {
391            states: backfill_states,
392            splits: owned_splits,
393        };
394        backfill_stage.debug_assert_consistent();
395
396        let (source_chunk_reader, backfill_info) = self
397            .build_stream_source_reader(
398                &source_desc,
399                backfill_stage.get_latest_unfinished_splits()?,
400            )
401            .instrument_await("source_build_reader")
402            .await?;
403        for (split_id, info) in &backfill_info {
404            let state = backfill_stage.states.get_mut(split_id).unwrap();
405            match info {
406                BackfillInfo::NoDataToBackfill => {
407                    state.state = BackfillState::Finished;
408                }
409                BackfillInfo::HasDataToBackfill { latest_offset } => {
410                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
411                    state.target_offset = Some(latest_offset.clone());
412                }
413            }
414        }
415        tracing::debug!(?backfill_stage, "source backfill started");
416
417        fn select_strategy(_: &mut ()) -> PollNext {
418            futures::stream::PollNext::Left
419        }
420
421        // We choose "preferring upstream" strategy here, because:
422        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
423        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
424        //   So possibly this can also affect other running jobs less.
425        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
426        let mut backfill_stream = select_with_strategy(
427            input.by_ref().map(Either::Left),
428            source_chunk_reader.map(Either::Right),
429            select_strategy,
430        );
431
432        type PausedReader = Option<impl Stream>;
433        let mut paused_reader: PausedReader = None;
434
435        macro_rules! pause_reader {
436            () => {
437                if !pause_control.reader_paused {
438                    let (left, right) = backfill_stream.into_inner();
439                    backfill_stream = select_with_strategy(
440                        left,
441                        futures::stream::pending().boxed().map(Either::Right),
442                        select_strategy,
443                    );
444                    // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
445                    paused_reader = Some(right);
446                    pause_control.reader_paused = true;
447                }
448            };
449        }
450
451        macro_rules! resume_reader {
452            () => {
453                if pause_control.reader_paused {
454                    backfill_stream = select_with_strategy(
455                        input.by_ref().map(Either::Left),
456                        paused_reader
457                            .take()
458                            .expect("should have paused reader to resume"),
459                        select_strategy,
460                    );
461                    pause_control.reader_paused = false;
462                }
463            };
464        }
465
466        if pause_control.is_paused() {
467            pause_reader!();
468        }
469
470        let state_store = self
471            .backfill_state_store
472            .state_store()
473            .state_store()
474            .clone();
475        let table_id = self.backfill_state_store.state_store().table_id();
476        let mut state_table_initialized = false;
477        {
478            let source_backfill_row_count = self
479                .metrics
480                .source_backfill_row_count
481                .with_guarded_label_values(&[
482                    &self.source_id.to_string(),
483                    &self.source_name,
484                    &self.actor_ctx.id.to_string(),
485                    &self.actor_ctx.fragment_id.to_string(),
486                ]);
487
488            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
489            // milliseconds, considering some other latencies like network and cost in Meta.
490            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
491                as u128
492                * WAIT_BARRIER_MULTIPLE_TIMES;
493            let mut last_barrier_time = Instant::now();
494
495            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
496            'backfill_loop: while let Some(either) = backfill_stream.next().await {
497                match either {
498                    // Upstream
499                    Either::Left(msg) => {
500                        let Ok(msg) = msg else {
501                            let e = msg.unwrap_err();
502                            tracing::error!(
503                                error = ?e.as_report(),
504                                source_id = %self.source_id,
505                                "stream source reader error",
506                            );
507                            GLOBAL_ERROR_METRICS.user_source_error.report([
508                                "SourceReaderError".to_owned(),
509                                self.source_id.to_string(),
510                                self.source_name.clone(),
511                                self.actor_ctx.fragment_id.to_string(),
512                            ]);
513
514                            let (reader, _backfill_info) = self
515                                .build_stream_source_reader(
516                                    &source_desc,
517                                    backfill_stage.get_latest_unfinished_splits()?,
518                                )
519                                .await?;
520
521                            backfill_stream = select_with_strategy(
522                                input.by_ref().map(Either::Left),
523                                reader.map(Either::Right),
524                                select_strategy,
525                            );
526                            continue;
527                        };
528                        match msg {
529                            Message::Barrier(barrier) => {
530                                last_barrier_time = Instant::now();
531
532                                if pause_control.self_resume() {
533                                    resume_reader!();
534                                }
535
536                                let mut maybe_muatation = None;
537                                if let Some(ref mutation) = barrier.mutation.as_deref() {
538                                    match mutation {
539                                        Mutation::Pause => {
540                                            // pause_reader should not be invoked consecutively more than once.
541                                            pause_control.command_pause();
542                                            pause_reader!();
543                                        }
544                                        Mutation::Resume => {
545                                            // pause_reader.take should not be invoked consecutively more than once.
546                                            if pause_control.command_resume() {
547                                                resume_reader!();
548                                            }
549                                        }
550                                        Mutation::StartFragmentBackfill { fragment_ids } => {
551                                            if fragment_ids.contains(&self.actor_ctx.fragment_id)
552                                                && pause_control.backfill_resume()
553                                            {
554                                                resume_reader!();
555                                            }
556                                        }
557                                        Mutation::SourceChangeSplit(actor_splits) => {
558                                            tracing::info!(
559                                                actor_splits = ?actor_splits,
560                                                "source change split received"
561                                            );
562                                            maybe_muatation = actor_splits
563                                                .get(&self.actor_ctx.id)
564                                                .cloned()
565                                                .map(|target_splits| {
566                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
567                                                        target_splits,
568                                                        should_trim_state: true,
569                                                    }
570                                                });
571                                        }
572                                        Mutation::Update(UpdateMutation {
573                                            actor_splits, ..
574                                        }) => {
575                                            maybe_muatation = actor_splits
576                                                .get(&self.actor_ctx.id)
577                                                .cloned()
578                                                .map(|target_splits| {
579                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
580                                                        target_splits,
581                                                        should_trim_state: false,
582                                                    }
583                                                });
584                                        }
585                                        Mutation::ConnectorPropsChange(maybe_mutation) => {
586                                            if let Some(props_plaintext) =
587                                                maybe_mutation.get(&self.source_id.as_raw_id())
588                                            {
589                                                source_desc
590                                                    .update_reader(props_plaintext.clone())?;
591
592                                                maybe_muatation = Some(
593                                                    ApplyMutationAfterBarrier::ConnectorPropsChange,
594                                                );
595                                            }
596                                        }
597                                        Mutation::Throttle(actor_to_apply) => {
598                                            if let Some(new_rate_limit) =
599                                                actor_to_apply.get(&self.actor_ctx.id)
600                                                && *new_rate_limit != self.rate_limit_rps
601                                            {
602                                                tracing::info!(
603                                                    "updating rate limit from {:?} to {:?}",
604                                                    self.rate_limit_rps,
605                                                    *new_rate_limit
606                                                );
607                                                self.rate_limit_rps = *new_rate_limit;
608                                                // rebuild reader
609                                                let (reader, _backfill_info) = self
610                                                    .build_stream_source_reader(
611                                                        &source_desc,
612                                                        backfill_stage
613                                                            .get_latest_unfinished_splits()?,
614                                                    )
615                                                    .await?;
616
617                                                backfill_stream = select_with_strategy(
618                                                    input.by_ref().map(Either::Left),
619                                                    reader.map(Either::Right),
620                                                    select_strategy,
621                                                );
622                                            }
623                                        }
624                                        _ => {}
625                                    }
626                                }
627                                async fn rebuild_reader_on_split_changed(
628                                    this: &SourceBackfillExecutorInner<impl StateStore>,
629                                    backfill_stage: &BackfillStage,
630                                    source_desc: &SourceDesc,
631                                ) -> StreamExecutorResult<BoxSourceChunkStream>
632                                {
633                                    // rebuild backfill_stream
634                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
635
636                                    let latest_unfinished_splits =
637                                        backfill_stage.get_latest_unfinished_splits()?;
638                                    tracing::info!(
639                                        "actor {:?} apply source split change to {:?}",
640                                        this.actor_ctx.id,
641                                        latest_unfinished_splits
642                                    );
643
644                                    // Replace the source reader with a new one of the new state.
645                                    let (reader, _backfill_info) = this
646                                        .build_stream_source_reader(
647                                            source_desc,
648                                            latest_unfinished_splits,
649                                        )
650                                        .await?;
651
652                                    Ok(reader)
653                                }
654
655                                self.backfill_state_store
656                                    .set_states(backfill_stage.states.clone())
657                                    .await?;
658                                self.backfill_state_store.commit(barrier.epoch).await?;
659
660                                if self.should_report_finished(&backfill_stage.states) {
661                                    // drop the backfill kafka consumers
662                                    backfill_stream = select_with_strategy(
663                                        input.by_ref().map(Either::Left),
664                                        futures::stream::pending().boxed().map(Either::Right),
665                                        select_strategy,
666                                    );
667
668                                    self.progress.finish(
669                                        barrier.epoch,
670                                        backfill_stage.total_backfilled_rows(),
671                                    );
672
673                                    let barrier_epoch = barrier.epoch;
674                                    let is_checkpoint = barrier.is_checkpoint();
675                                    // yield barrier after reporting progress
676                                    yield Message::Barrier(barrier);
677
678                                    if let Some(to_apply_mutation) = maybe_muatation {
679                                        self.apply_split_change_after_yield_barrier(
680                                            barrier_epoch,
681                                            &mut backfill_stage,
682                                            to_apply_mutation,
683                                        )
684                                        .await?;
685                                    }
686
687                                    if !state_table_initialized {
688                                        if is_checkpoint {
689                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
690                                            // We wait for 2nd epoch
691                                            let epoch = barrier_epoch.prev;
692                                            tracing::info!("waiting for epoch: {}", epoch);
693                                            state_store
694                                                .try_wait_epoch(
695                                                    HummockReadEpoch::Committed(epoch),
696                                                    TryWaitEpochOptions { table_id },
697                                                )
698                                                .await?;
699                                            tracing::info!("finished waiting for epoch: {}", epoch);
700                                            state_table_initialized = true;
701                                        }
702                                    } else {
703                                        // After we reported finished, we still don't exit the loop.
704                                        // Because we need to handle split migration.
705                                        assert!(
706                                            state_table_initialized,
707                                            "state table should be initialized before checking backfill finished"
708                                        );
709                                        if self.backfill_finished(&backfill_stage.states).await? {
710                                            tracing::info!("source backfill finished");
711                                            break 'backfill_loop;
712                                        }
713                                    }
714                                } else {
715                                    self.progress.update_for_source_backfill(
716                                        barrier.epoch,
717                                        backfill_stage.total_backfilled_rows(),
718                                    );
719
720                                    let barrier_epoch = barrier.epoch;
721                                    // yield barrier after reporting progress
722                                    yield Message::Barrier(barrier);
723
724                                    if let Some(to_apply_mutation) = maybe_muatation
725                                        && self
726                                            .apply_split_change_after_yield_barrier(
727                                                barrier_epoch,
728                                                &mut backfill_stage,
729                                                to_apply_mutation,
730                                            )
731                                            .await?
732                                    {
733                                        let reader = rebuild_reader_on_split_changed(
734                                            &self,
735                                            &backfill_stage,
736                                            &source_desc,
737                                        )
738                                        .await?;
739
740                                        backfill_stream = select_with_strategy(
741                                            input.by_ref().map(Either::Left),
742                                            reader.map(Either::Right),
743                                            select_strategy,
744                                        );
745                                    }
746                                }
747                            }
748                            Message::Chunk(chunk) => {
749                                // We need to iterate over all rows because there might be multiple splits in a chunk.
750                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
751                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
752
753                                for (i, (_, row)) in chunk.rows().enumerate() {
754                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
755                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
756                                    let vis = backfill_stage.handle_upstream_row(split, offset);
757                                    new_vis.set(i, vis);
758                                }
759                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
760                                let new_vis = new_vis.finish();
761                                if new_vis.count_ones() != 0 {
762                                    let new_chunk = chunk.clone_with_vis(new_vis);
763                                    yield Message::Chunk(new_chunk);
764                                }
765                            }
766                            Message::Watermark(_) => {
767                                // Ignore watermark during backfill.
768                            }
769                        }
770                    }
771                    // backfill
772                    Either::Right(msg) => {
773                        let chunk = msg?;
774
775                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
776                            // Pause to let barrier catch up via backpressure of snapshot stream.
777                            pause_control.self_pause();
778                            pause_reader!();
779
780                            // Exceeds the max wait barrier time, the source will be paused.
781                            // Currently we can guarantee the
782                            // source is not paused since it received stream
783                            // chunks.
784                            tracing::warn!(
785                                "source {} paused, wait barrier for {:?}",
786                                self.info.identity,
787                                last_barrier_time.elapsed()
788                            );
789
790                            // Only update `max_wait_barrier_time_ms` to capture
791                            // `barrier_interval_ms`
792                            // changes here to avoid frequently accessing the shared
793                            // `system_params`.
794                            max_wait_barrier_time_ms =
795                                self.system_params.load().barrier_interval_ms() as u128
796                                    * WAIT_BARRIER_MULTIPLE_TIMES;
797                        }
798                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
799
800                        for (i, (_, row)) in chunk.rows().enumerate() {
801                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
802                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
803                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
804                            new_vis.set(i, vis);
805                        }
806
807                        let new_vis = new_vis.finish();
808                        let card = new_vis.count_ones();
809                        if card != 0 {
810                            let new_chunk = chunk.clone_with_vis(new_vis);
811                            yield Message::Chunk(new_chunk);
812                            source_backfill_row_count.inc_by(card as u64);
813                        }
814                    }
815                }
816            }
817        }
818
819        std::mem::drop(backfill_stream);
820        let mut states = backfill_stage.states;
821        // Make sure `Finished` state is persisted.
822        self.backfill_state_store.set_states(states.clone()).await?;
823
824        // All splits finished backfilling. Now we only forward the source data.
825        #[for_await]
826        for msg in input {
827            let msg = msg?;
828            match msg {
829                Message::Barrier(barrier) => {
830                    let mut split_changed = None;
831                    if let Some(ref mutation) = barrier.mutation.as_deref() {
832                        match mutation {
833                            Mutation::Pause | Mutation::Resume => {
834                                // We don't need to do anything. Handled by upstream.
835                            }
836                            Mutation::SourceChangeSplit(actor_splits) => {
837                                tracing::info!(
838                                    actor_splits = ?actor_splits,
839                                    "source change split received"
840                                );
841                                split_changed = actor_splits
842                                    .get(&self.actor_ctx.id)
843                                    .cloned()
844                                    .map(|target_splits| (target_splits, &mut states, true));
845                            }
846                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
847                                split_changed = actor_splits
848                                    .get(&self.actor_ctx.id)
849                                    .cloned()
850                                    .map(|target_splits| (target_splits, &mut states, false));
851                            }
852                            _ => {}
853                        }
854                    }
855                    self.backfill_state_store.commit(barrier.epoch).await?;
856                    let barrier_epoch = barrier.epoch;
857                    yield Message::Barrier(barrier);
858
859                    if let Some((target_splits, state, should_trim_state)) = split_changed {
860                        self.apply_split_change_forward_stage_after_yield_barrier(
861                            barrier_epoch,
862                            target_splits,
863                            state,
864                            should_trim_state,
865                        )
866                        .await?;
867                    }
868                }
869                Message::Chunk(chunk) => {
870                    yield Message::Chunk(chunk);
871                }
872                Message::Watermark(watermark) => {
873                    yield Message::Watermark(watermark);
874                }
875            }
876        }
877    }
878
879    /// When we should call `progress.finish()` to let blocking DDL return.
880    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
881    ///
882    /// Note: split migration (online scaling) is related with progress tracking.
883    /// - For foreground DDL, scaling is not allowed before progress is finished.
884    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
885    ///
886    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
887    fn should_report_finished(&self, states: &BackfillStates) -> bool {
888        states.values().all(|state| {
889            matches!(
890                state.state,
891                BackfillState::Finished | BackfillState::SourceCachingUp(_)
892            )
893        })
894    }
895
896    /// All splits entered `Finished` state.
897    ///
898    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
899    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
900    /// this actor, we still need to backfill it.
901    ///
902    /// Note: at the beginning, the actor will only read the state written by itself.
903    /// It needs to _wait until it can read all actors' written data_.
904    /// i.e., wait for the second checkpoint has been available.
905    ///
906    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
907    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
908        Ok(states
909            .values()
910            .all(|state| matches!(state.state, BackfillState::Finished))
911            && self
912                .backfill_state_store
913                .scan_may_stale()
914                .await?
915                .into_iter()
916                .all(|state| matches!(state.state, BackfillState::Finished)))
917    }
918
919    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
920    async fn apply_split_change_after_yield_barrier(
921        &mut self,
922        barrier_epoch: EpochPair,
923        stage: &mut BackfillStage,
924        to_apply_mutation: ApplyMutationAfterBarrier,
925    ) -> StreamExecutorResult<bool> {
926        match to_apply_mutation {
927            ApplyMutationAfterBarrier::SourceChangeSplit {
928                target_splits,
929                should_trim_state,
930            } => {
931                self.source_split_change_count.inc();
932                {
933                    if self
934                        .update_state_if_changed(
935                            barrier_epoch,
936                            target_splits,
937                            stage,
938                            should_trim_state,
939                        )
940                        .await?
941                    {
942                        // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
943                        Ok(true)
944                    } else {
945                        Ok(false)
946                    }
947                }
948            }
949            ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
950        }
951    }
952
953    /// Returns `true` if split changed. Otherwise `false`.
954    async fn update_state_if_changed(
955        &mut self,
956        barrier_epoch: EpochPair,
957        target_splits: Vec<SplitImpl>,
958        stage: &mut BackfillStage,
959        should_trim_state: bool,
960    ) -> StreamExecutorResult<bool> {
961        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
962
963        let mut split_changed = false;
964        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
965        // Will be set to target_state in the end.
966        let old_states = std::mem::take(&mut stage.states);
967        let committed_reader = self
968            .backfill_state_store
969            .new_committed_reader(barrier_epoch)
970            .await?;
971        // Iterate over the target (assigned) splits
972        // - check if any new splits are added
973        // - build target_state
974        for split in &target_splits {
975            let split_id = split.id();
976            if let Some(s) = old_states.get(&split_id) {
977                target_state.insert(split_id, s.clone());
978            } else {
979                split_changed = true;
980
981                let backfill_state = committed_reader
982                    .try_recover_from_state_store(&split_id)
983                    .await?;
984                match backfill_state {
985                    None => {
986                        // Newly added split. We don't need to backfill.
987                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
988                        target_state.insert(
989                            split_id,
990                            BackfillStateWithProgress {
991                                state: BackfillState::Finished,
992                                num_consumed_rows: 0,
993                                target_offset: None,
994                            },
995                        );
996                    }
997                    Some(backfill_state) => {
998                        // Migrated split. Backfill if unfinished.
999                        target_state.insert(split_id, backfill_state);
1000                    }
1001                }
1002            }
1003        }
1004
1005        // Checks dropped splits
1006        for existing_split_id in old_states.keys() {
1007            if !target_state.contains_key(existing_split_id) {
1008                tracing::info!("split dropping detected: {}", existing_split_id);
1009                split_changed = true;
1010            }
1011        }
1012
1013        if split_changed {
1014            let dropped_splits = stage
1015                .states
1016                .extract_if(|split_id, _| !target_state.contains_key(split_id))
1017                .map(|(split_id, _)| split_id);
1018
1019            if should_trim_state {
1020                // trim dropped splits' state
1021                self.backfill_state_store.trim_state(dropped_splits).await?;
1022            }
1023            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1024        } else {
1025            debug_assert_eq!(old_states, target_state);
1026        }
1027        stage.states = target_state;
1028        stage.splits = target_splits;
1029        stage.debug_assert_consistent();
1030        Ok(split_changed)
1031    }
1032
1033    /// For split change during forward stage, all newly added splits should be already finished.
1034    // We just need to update the state store if necessary.
1035    async fn apply_split_change_forward_stage_after_yield_barrier(
1036        &mut self,
1037        barrier_epoch: EpochPair,
1038        target_splits: Vec<SplitImpl>,
1039        states: &mut BackfillStates,
1040        should_trim_state: bool,
1041    ) -> StreamExecutorResult<()> {
1042        self.source_split_change_count.inc();
1043        {
1044            self.update_state_if_changed_forward_stage(
1045                barrier_epoch,
1046                target_splits,
1047                states,
1048                should_trim_state,
1049            )
1050            .await?;
1051        }
1052
1053        Ok(())
1054    }
1055
1056    async fn update_state_if_changed_forward_stage(
1057        &mut self,
1058        barrier_epoch: EpochPair,
1059        target_splits: Vec<SplitImpl>,
1060        states: &mut BackfillStates,
1061        should_trim_state: bool,
1062    ) -> StreamExecutorResult<()> {
1063        let target_splits: HashSet<SplitId> =
1064            target_splits.into_iter().map(|split| split.id()).collect();
1065
1066        let mut split_changed = false;
1067        let mut newly_added_splits = vec![];
1068
1069        let committed_reader = self
1070            .backfill_state_store
1071            .new_committed_reader(barrier_epoch)
1072            .await?;
1073
1074        // Checks added splits
1075        for split_id in &target_splits {
1076            if !states.contains_key(split_id) {
1077                split_changed = true;
1078
1079                let backfill_state = committed_reader
1080                    .try_recover_from_state_store(split_id)
1081                    .await?;
1082                match &backfill_state {
1083                    None => {
1084                        // Newly added split. We don't need to backfill!
1085                        newly_added_splits.push(split_id.clone());
1086                    }
1087                    Some(backfill_state) => {
1088                        // Migrated split. It should also be finished since we are in forwarding stage.
1089                        match backfill_state.state {
1090                            BackfillState::Finished => {}
1091                            _ => {
1092                                return Err(anyhow::anyhow!(
1093                                    "Unexpected backfill state: {:?}",
1094                                    backfill_state
1095                                )
1096                                .into());
1097                            }
1098                        }
1099                    }
1100                }
1101                states.insert(
1102                    split_id.clone(),
1103                    backfill_state.unwrap_or(BackfillStateWithProgress {
1104                        state: BackfillState::Finished,
1105                        num_consumed_rows: 0,
1106                        target_offset: None,
1107                    }),
1108                );
1109            }
1110        }
1111
1112        // Checks dropped splits
1113        for existing_split_id in states.keys() {
1114            if !target_splits.contains(existing_split_id) {
1115                tracing::info!("split dropping detected: {}", existing_split_id);
1116                split_changed = true;
1117            }
1118        }
1119
1120        if split_changed {
1121            tracing::info!(
1122                target_splits = ?target_splits,
1123                "apply split change"
1124            );
1125
1126            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1127
1128            if should_trim_state {
1129                // trim dropped splits' state
1130                self.backfill_state_store
1131                    .trim_state(dropped_splits.map(|(k, _v)| k))
1132                    .await?;
1133            }
1134
1135            // For migrated splits, and existing splits, we do not need to update
1136            // state store, but only for newly added splits.
1137            self.backfill_state_store
1138                .set_states(
1139                    newly_added_splits
1140                        .into_iter()
1141                        .map(|split_id| {
1142                            (
1143                                split_id,
1144                                BackfillStateWithProgress {
1145                                    state: BackfillState::Finished,
1146                                    num_consumed_rows: 0,
1147                                    target_offset: None,
1148                                },
1149                            )
1150                        })
1151                        .collect(),
1152                )
1153                .await?;
1154        }
1155
1156        Ok(())
1157    }
1158}
1159
1160fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1161    let a = a.parse::<i64>().unwrap();
1162    let b = b.parse::<i64>().unwrap();
1163    a.cmp(&b)
1164}
1165
1166impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1167    fn execute(self: Box<Self>) -> BoxedMessageStream {
1168        self.inner.execute(self.input).boxed()
1169    }
1170}
1171
1172impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1173    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1174        f.debug_struct("SourceBackfillExecutor")
1175            .field("source_id", &self.source_id)
1176            .field("column_ids", &self.column_ids)
1177            .field("stream_key", &self.info.stream_key)
1178            .finish()
1179    }
1180}
1181
1182struct PauseControl {
1183    // Paused due to backfill order control
1184    backfill_paused: bool,
1185    // Paused due to self-pause, e.g. let barrier catch up
1186    self_paused: bool,
1187    // Paused due to Pause command from meta, pause_on_next_bootstrap
1188    command_paused: bool,
1189    // reader paused
1190    reader_paused: bool,
1191}
1192
1193impl PauseControl {
1194    fn new() -> Self {
1195        Self {
1196            backfill_paused: false,
1197            self_paused: false,
1198            command_paused: false,
1199            reader_paused: false,
1200        }
1201    }
1202
1203    fn is_paused(&self) -> bool {
1204        self.backfill_paused || self.command_paused || self.self_paused
1205    }
1206
1207    /// returns whether we need to pause the reader.
1208    fn backfill_pause(&mut self) {
1209        if self.backfill_paused {
1210            tracing::warn!("backfill_pause invoked twice");
1211        }
1212        self.backfill_paused = true;
1213    }
1214
1215    /// returns whether we need to resume the reader.
1216    /// same precedence as command.
1217    fn backfill_resume(&mut self) -> bool {
1218        if !self.backfill_paused {
1219            tracing::warn!("backfill_resume invoked twice");
1220        }
1221        !self.command_paused
1222    }
1223
1224    /// returns whether we need to pause the reader.
1225    fn self_pause(&mut self) {
1226        assert!(
1227            !self.backfill_paused,
1228            "backfill stream should not be read when backfill_pause is set"
1229        );
1230        assert!(
1231            !self.command_paused,
1232            "backfill stream should not be read when command_pause is set"
1233        );
1234        if self.self_paused {
1235            tracing::warn!("self_pause invoked twice");
1236        }
1237        self.self_paused = true;
1238    }
1239
1240    /// returns whether we need to resume the reader.
1241    /// `self_resume` has the lowest precedence,
1242    /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1243    fn self_resume(&mut self) -> bool {
1244        self.self_paused = false;
1245        !(self.backfill_paused || self.command_paused)
1246    }
1247
1248    /// returns whether we need to pause the reader.
1249    fn command_pause(&mut self) {
1250        if self.command_paused {
1251            tracing::warn!("command_pause invoked twice");
1252        }
1253        self.command_paused = true;
1254    }
1255
1256    /// returns whether we need to resume the reader.
1257    /// same precedence as backfill.
1258    fn command_resume(&mut self) -> bool {
1259        if !self.command_paused {
1260            tracing::warn!("command_resume invoked twice");
1261        }
1262        self.command_paused = false;
1263        !self.backfill_paused
1264    }
1265}