risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32    BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33    SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51    /// `None` means not started yet. It's the initial state.
52    /// XXX: perhaps we can also set to low-watermark instead of `None`
53    Backfilling(Option<String>),
54    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55    SourceCachingUp(String),
56    Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64    pub state: BackfillState,
65    pub num_consumed_rows: u64,
66    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68    /// so that we can finish backfilling even when upstream doesn't emit any data.
69    pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73    pub fn encode_to_json(self) -> JsonbVal {
74        serde_json::to_value(self).unwrap().into()
75    }
76
77    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79    }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83    pub inner: SourceBackfillExecutorInner<S>,
84    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85    pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89    actor_ctx: ActorContextRef,
90    info: ExecutorInfo,
91
92    /// Streaming source for external
93    source_id: TableId,
94    source_name: String,
95    column_ids: Vec<ColumnId>,
96    source_desc_builder: Option<SourceDescBuilder>,
97    backfill_state_store: BackfillStateTableHandler<S>,
98
99    /// Metrics for monitor.
100    metrics: Arc<StreamingMetrics>,
101    source_split_change_count: LabelGuardedIntCounter,
102
103    // /// Receiver of barrier channel.
104    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105    /// System parameter reader to read barrier interval
106    system_params: SystemParamsReaderRef,
107
108    /// Rate limit in rows/s.
109    rate_limit_rps: Option<u32>,
110
111    progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121    states: BackfillStates,
122    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123    ///
124    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125    splits: Vec<SplitImpl>,
126}
127
128enum ApplyMutationAfterBarrier {
129    SourceChangeSplit {
130        target_splits: Vec<SplitImpl>,
131        should_trim_state: bool,
132    },
133    ConnectorPropsChange,
134}
135
136impl BackfillStage {
137    fn total_backfilled_rows(&self) -> u64 {
138        self.states.values().map(|s| s.num_consumed_rows).sum()
139    }
140
141    fn debug_assert_consistent(&self) {
142        if cfg!(debug_assertions) {
143            let all_splits: HashSet<_> =
144                self.splits.iter().map(|split| split.id().clone()).collect();
145            assert_eq!(
146                self.states.keys().cloned().collect::<HashSet<_>>(),
147                all_splits
148            );
149        }
150    }
151
152    /// Get unfinished splits with latest offsets according to the backfill states.
153    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
154        let mut unfinished_splits = Vec::new();
155        for split in &self.splits {
156            let state = &self.states.get(split.id().as_ref()).unwrap().state;
157            match state {
158                BackfillState::Backfilling(Some(offset)) => {
159                    let mut updated_split = split.clone();
160                    updated_split.update_in_place(offset.clone())?;
161                    unfinished_splits.push(updated_split);
162                }
163                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
164                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
165            }
166        }
167        Ok(unfinished_splits)
168    }
169
170    /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
171    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
172        let mut vis = false;
173        let state = self.states.get_mut(split_id).unwrap();
174        let state_inner = &mut state.state;
175        match state_inner {
176            BackfillState::Backfilling(None) => {
177                // backfilling for this split is not started yet. Ignore this row
178            }
179            BackfillState::Backfilling(Some(backfill_offset)) => {
180                match compare_kafka_offset(backfill_offset, offset) {
181                    Ordering::Less => {
182                        // continue backfilling. Ignore this row
183                    }
184                    Ordering::Equal => {
185                        // backfilling for this split is finished just right.
186                        *state_inner = BackfillState::Finished;
187                    }
188                    Ordering::Greater => {
189                        // backfilling for this split produced more data than current source's progress.
190                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
191                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
192                    }
193                }
194            }
195            BackfillState::SourceCachingUp(backfill_offset) => {
196                match compare_kafka_offset(backfill_offset, offset) {
197                    Ordering::Less => {
198                        // Source caught up, but doesn't contain the last backfilled row.
199                        // This may happen e.g., if Kafka performed compaction.
200                        vis = true;
201                        *state_inner = BackfillState::Finished;
202                    }
203                    Ordering::Equal => {
204                        // Source just caught up with backfilling.
205                        *state_inner = BackfillState::Finished;
206                    }
207                    Ordering::Greater => {
208                        // Source is still behind backfilling.
209                    }
210                }
211            }
212            BackfillState::Finished => {
213                vis = true;
214                // This split's backfilling is finished, we are waiting for other splits
215            }
216        }
217        if matches!(state_inner, BackfillState::Backfilling(_)) {
218            state.target_offset = Some(offset.to_owned());
219        }
220        if vis {
221            debug_assert_eq!(*state_inner, BackfillState::Finished);
222        }
223        vis
224    }
225
226    /// Updates backfill states and returns whether the row backfilled from external system is visible.
227    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
228        let state = self.states.get_mut(split_id).unwrap();
229        state.num_consumed_rows += 1;
230        let state_inner = &mut state.state;
231        match state_inner {
232            BackfillState::Backfilling(_old_offset) => {
233                if let Some(target_offset) = &state.target_offset
234                    && compare_kafka_offset(offset, target_offset).is_ge()
235                {
236                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
237                    // and dropping duplicated messages.
238                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
239                    // In this case, upstream hasn't reached the target_offset yet.
240                    //
241                    // Note2: after this, all following rows in the current chunk will be invisible.
242                    //
243                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
244                    // keep backfilling.
245                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
246                } else {
247                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
248                }
249                true
250            }
251            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
252                // backfilling stopped. ignore
253                false
254            }
255        }
256    }
257}
258
259impl<S: StateStore> SourceBackfillExecutorInner<S> {
260    #[expect(clippy::too_many_arguments)]
261    pub fn new(
262        actor_ctx: ActorContextRef,
263        info: ExecutorInfo,
264        stream_source_core: StreamSourceCore<S>,
265        metrics: Arc<StreamingMetrics>,
266        system_params: SystemParamsReaderRef,
267        backfill_state_store: BackfillStateTableHandler<S>,
268        rate_limit_rps: Option<u32>,
269        progress: CreateMviewProgressReporter,
270    ) -> Self {
271        let source_split_change_count = metrics
272            .source_split_change_count
273            .with_guarded_label_values(&[
274                &stream_source_core.source_id.to_string(),
275                &stream_source_core.source_name,
276                &actor_ctx.id.to_string(),
277                &actor_ctx.fragment_id.to_string(),
278            ]);
279
280        Self {
281            actor_ctx,
282            info,
283            source_id: stream_source_core.source_id,
284            source_name: stream_source_core.source_name,
285            column_ids: stream_source_core.column_ids,
286            source_desc_builder: stream_source_core.source_desc_builder,
287            backfill_state_store,
288            metrics,
289            source_split_change_count,
290            system_params,
291            rate_limit_rps,
292            progress,
293        }
294    }
295
296    async fn build_stream_source_reader(
297        &self,
298        source_desc: &SourceDesc,
299        splits: Vec<SplitImpl>,
300    ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
301        let column_ids = source_desc
302            .columns
303            .iter()
304            .map(|column_desc| column_desc.column_id)
305            .collect_vec();
306        let source_ctx = SourceContext::new(
307            self.actor_ctx.id,
308            self.source_id,
309            self.actor_ctx.fragment_id,
310            self.source_name.clone(),
311            source_desc.metrics.clone(),
312            SourceCtrlOpts {
313                chunk_size: limited_chunk_size(self.rate_limit_rps),
314                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
315            },
316            source_desc.source.config.clone(),
317            None,
318        );
319
320        // We will check watermark to decide whether we need to backfill.
321        // e.g., when there's a Kafka topic-partition without any data,
322        // we don't need to backfill at all. But if we do not check here,
323        // the executor can only know it's finished when data coming in.
324        // For blocking DDL, this would be annoying.
325
326        let (stream, res) = source_desc
327            .source
328            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
329            .await
330            .map_err(StreamExecutorError::connector_error)?;
331        Ok((
332            apply_rate_limit(stream, self.rate_limit_rps).boxed(),
333            res.backfill_info,
334        ))
335    }
336
337    #[try_stream(ok = Message, error = StreamExecutorError)]
338    async fn execute(mut self, input: Executor) {
339        let mut input = input.execute();
340
341        // Poll the upstream to get the first barrier.
342        let barrier = expect_first_barrier(&mut input).await?;
343        let first_epoch = barrier.epoch;
344        let owned_splits = barrier
345            .initial_split_assignment(self.actor_ctx.id)
346            .unwrap_or(&[])
347            .to_vec();
348
349        let mut pause_control = PauseControl::new();
350        if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
351            pause_control.backfill_pause();
352        }
353        if barrier.is_pause_on_startup() {
354            pause_control.command_pause();
355        }
356        yield Message::Barrier(barrier);
357
358        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
359        let mut source_desc = source_desc_builder
360            .build()
361            .map_err(StreamExecutorError::connector_error)?;
362        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
363        else {
364            unreachable!("Partition and offset columns must be set.");
365        };
366
367        self.backfill_state_store.init_epoch(first_epoch).await?;
368
369        let mut backfill_states: BackfillStates = HashMap::new();
370        {
371            let committed_reader = self
372                .backfill_state_store
373                .new_committed_reader(first_epoch)
374                .await?;
375            for split in &owned_splits {
376                let split_id = split.id();
377                let backfill_state = committed_reader
378                    .try_recover_from_state_store(&split_id)
379                    .await?
380                    .unwrap_or(BackfillStateWithProgress {
381                        state: BackfillState::Backfilling(None),
382                        num_consumed_rows: 0,
383                        target_offset: None, // init with None
384                    });
385                backfill_states.insert(split_id, backfill_state);
386            }
387        }
388        let mut backfill_stage = BackfillStage {
389            states: backfill_states,
390            splits: owned_splits,
391        };
392        backfill_stage.debug_assert_consistent();
393
394        let (source_chunk_reader, backfill_info) = self
395            .build_stream_source_reader(
396                &source_desc,
397                backfill_stage.get_latest_unfinished_splits()?,
398            )
399            .instrument_await("source_build_reader")
400            .await?;
401        for (split_id, info) in &backfill_info {
402            let state = backfill_stage.states.get_mut(split_id).unwrap();
403            match info {
404                BackfillInfo::NoDataToBackfill => {
405                    state.state = BackfillState::Finished;
406                }
407                BackfillInfo::HasDataToBackfill { latest_offset } => {
408                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
409                    state.target_offset = Some(latest_offset.clone());
410                }
411            }
412        }
413        tracing::debug!(?backfill_stage, "source backfill started");
414
415        fn select_strategy(_: &mut ()) -> PollNext {
416            futures::stream::PollNext::Left
417        }
418
419        // We choose "preferring upstream" strategy here, because:
420        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
421        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
422        //   So possibly this can also affect other running jobs less.
423        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
424        let mut backfill_stream = select_with_strategy(
425            input.by_ref().map(Either::Left),
426            source_chunk_reader.map(Either::Right),
427            select_strategy,
428        );
429
430        type PausedReader = Option<impl Stream>;
431        let mut paused_reader: PausedReader = None;
432
433        macro_rules! pause_reader {
434            () => {
435                if !pause_control.reader_paused {
436                    let (left, right) = backfill_stream.into_inner();
437                    backfill_stream = select_with_strategy(
438                        left,
439                        futures::stream::pending().boxed().map(Either::Right),
440                        select_strategy,
441                    );
442                    // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
443                    paused_reader = Some(right);
444                    pause_control.reader_paused = true;
445                }
446            };
447        }
448
449        macro_rules! resume_reader {
450            () => {
451                if pause_control.reader_paused {
452                    backfill_stream = select_with_strategy(
453                        input.by_ref().map(Either::Left),
454                        paused_reader
455                            .take()
456                            .expect("should have paused reader to resume"),
457                        select_strategy,
458                    );
459                    pause_control.reader_paused = false;
460                }
461            };
462        }
463
464        if pause_control.is_paused() {
465            pause_reader!();
466        }
467
468        let state_store = self
469            .backfill_state_store
470            .state_store()
471            .state_store()
472            .clone();
473        let table_id = self.backfill_state_store.state_store().table_id().into();
474        let mut state_table_initialized = false;
475        {
476            let source_backfill_row_count = self
477                .metrics
478                .source_backfill_row_count
479                .with_guarded_label_values(&[
480                    &self.source_id.to_string(),
481                    &self.source_name,
482                    &self.actor_ctx.id.to_string(),
483                    &self.actor_ctx.fragment_id.to_string(),
484                ]);
485
486            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
487            // milliseconds, considering some other latencies like network and cost in Meta.
488            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
489                as u128
490                * WAIT_BARRIER_MULTIPLE_TIMES;
491            let mut last_barrier_time = Instant::now();
492
493            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
494            'backfill_loop: while let Some(either) = backfill_stream.next().await {
495                match either {
496                    // Upstream
497                    Either::Left(msg) => {
498                        let Ok(msg) = msg else {
499                            let e = msg.unwrap_err();
500                            tracing::warn!(
501                                error = ?e.as_report(),
502                                source_id = %self.source_id,
503                                "stream source reader error",
504                            );
505                            GLOBAL_ERROR_METRICS.user_source_error.report([
506                                "SourceReaderError".to_owned(),
507                                self.source_id.to_string(),
508                                self.source_name.to_owned(),
509                                self.actor_ctx.fragment_id.to_string(),
510                            ]);
511
512                            let (reader, _backfill_info) = self
513                                .build_stream_source_reader(
514                                    &source_desc,
515                                    backfill_stage.get_latest_unfinished_splits()?,
516                                )
517                                .await?;
518
519                            backfill_stream = select_with_strategy(
520                                input.by_ref().map(Either::Left),
521                                reader.map(Either::Right),
522                                select_strategy,
523                            );
524                            continue;
525                        };
526                        match msg {
527                            Message::Barrier(barrier) => {
528                                last_barrier_time = Instant::now();
529
530                                if pause_control.self_resume() {
531                                    resume_reader!();
532                                }
533
534                                let mut maybe_muatation = None;
535                                if let Some(ref mutation) = barrier.mutation.as_deref() {
536                                    match mutation {
537                                        Mutation::Pause => {
538                                            // pause_reader should not be invoked consecutively more than once.
539                                            pause_control.command_pause();
540                                            pause_reader!();
541                                        }
542                                        Mutation::Resume => {
543                                            // pause_reader.take should not be invoked consecutively more than once.
544                                            if pause_control.command_resume() {
545                                                resume_reader!();
546                                            }
547                                        }
548                                        Mutation::StartFragmentBackfill { fragment_ids } => {
549                                            if fragment_ids.contains(&self.actor_ctx.fragment_id)
550                                                && pause_control.backfill_resume()
551                                            {
552                                                resume_reader!();
553                                            }
554                                        }
555                                        Mutation::SourceChangeSplit(actor_splits) => {
556                                            tracing::info!(
557                                                actor_splits = ?actor_splits,
558                                                "source change split received"
559                                            );
560                                            maybe_muatation = actor_splits
561                                                .get(&self.actor_ctx.id)
562                                                .cloned()
563                                                .map(|target_splits| {
564                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
565                                                        target_splits,
566                                                        should_trim_state: true,
567                                                    }
568                                                });
569                                        }
570                                        Mutation::Update(UpdateMutation {
571                                            actor_splits, ..
572                                        }) => {
573                                            maybe_muatation = actor_splits
574                                                .get(&self.actor_ctx.id)
575                                                .cloned()
576                                                .map(|target_splits| {
577                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
578                                                        target_splits,
579                                                        should_trim_state: false,
580                                                    }
581                                                });
582                                        }
583                                        Mutation::ConnectorPropsChange(maybe_mutation) => {
584                                            if let Some(props_plaintext) =
585                                                maybe_mutation.get(&self.source_id.table_id())
586                                            {
587                                                source_desc
588                                                    .update_reader(props_plaintext.clone())?;
589
590                                                maybe_muatation = Some(
591                                                    ApplyMutationAfterBarrier::ConnectorPropsChange,
592                                                );
593                                            }
594                                        }
595                                        Mutation::Throttle(actor_to_apply) => {
596                                            if let Some(new_rate_limit) =
597                                                actor_to_apply.get(&self.actor_ctx.id)
598                                                && *new_rate_limit != self.rate_limit_rps
599                                            {
600                                                tracing::info!(
601                                                    "updating rate limit from {:?} to {:?}",
602                                                    self.rate_limit_rps,
603                                                    *new_rate_limit
604                                                );
605                                                self.rate_limit_rps = *new_rate_limit;
606                                                // rebuild reader
607                                                let (reader, _backfill_info) = self
608                                                    .build_stream_source_reader(
609                                                        &source_desc,
610                                                        backfill_stage
611                                                            .get_latest_unfinished_splits()?,
612                                                    )
613                                                    .await?;
614
615                                                backfill_stream = select_with_strategy(
616                                                    input.by_ref().map(Either::Left),
617                                                    reader.map(Either::Right),
618                                                    select_strategy,
619                                                );
620                                            }
621                                        }
622                                        _ => {}
623                                    }
624                                }
625                                async fn rebuild_reader_on_split_changed(
626                                    this: &SourceBackfillExecutorInner<impl StateStore>,
627                                    backfill_stage: &BackfillStage,
628                                    source_desc: &SourceDesc,
629                                ) -> StreamExecutorResult<BoxSourceChunkStream>
630                                {
631                                    // rebuild backfill_stream
632                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
633
634                                    let latest_unfinished_splits =
635                                        backfill_stage.get_latest_unfinished_splits()?;
636                                    tracing::info!(
637                                        "actor {:?} apply source split change to {:?}",
638                                        this.actor_ctx.id,
639                                        latest_unfinished_splits
640                                    );
641
642                                    // Replace the source reader with a new one of the new state.
643                                    let (reader, _backfill_info) = this
644                                        .build_stream_source_reader(
645                                            source_desc,
646                                            latest_unfinished_splits,
647                                        )
648                                        .await?;
649
650                                    Ok(reader)
651                                }
652
653                                self.backfill_state_store
654                                    .set_states(backfill_stage.states.clone())
655                                    .await?;
656                                self.backfill_state_store.commit(barrier.epoch).await?;
657
658                                if self.should_report_finished(&backfill_stage.states) {
659                                    // drop the backfill kafka consumers
660                                    backfill_stream = select_with_strategy(
661                                        input.by_ref().map(Either::Left),
662                                        futures::stream::pending().boxed().map(Either::Right),
663                                        select_strategy,
664                                    );
665
666                                    self.progress.finish(
667                                        barrier.epoch,
668                                        backfill_stage.total_backfilled_rows(),
669                                    );
670
671                                    let barrier_epoch = barrier.epoch;
672                                    let is_checkpoint = barrier.is_checkpoint();
673                                    // yield barrier after reporting progress
674                                    yield Message::Barrier(barrier);
675
676                                    if let Some(to_apply_mutation) = maybe_muatation {
677                                        self.apply_split_change_after_yield_barrier(
678                                            barrier_epoch,
679                                            &mut backfill_stage,
680                                            to_apply_mutation,
681                                        )
682                                        .await?;
683                                    }
684
685                                    if !state_table_initialized {
686                                        if is_checkpoint {
687                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
688                                            // We wait for 2nd epoch
689                                            let epoch = barrier_epoch.prev;
690                                            tracing::info!("waiting for epoch: {}", epoch);
691                                            state_store
692                                                .try_wait_epoch(
693                                                    HummockReadEpoch::Committed(epoch),
694                                                    TryWaitEpochOptions { table_id },
695                                                )
696                                                .await?;
697                                            tracing::info!("finished waiting for epoch: {}", epoch);
698                                            state_table_initialized = true;
699                                        }
700                                    } else {
701                                        // After we reported finished, we still don't exit the loop.
702                                        // Because we need to handle split migration.
703                                        assert!(
704                                            state_table_initialized,
705                                            "state table should be initialized before checking backfill finished"
706                                        );
707                                        if self.backfill_finished(&backfill_stage.states).await? {
708                                            tracing::info!("source backfill finished");
709                                            break 'backfill_loop;
710                                        }
711                                    }
712                                } else {
713                                    self.progress.update_for_source_backfill(
714                                        barrier.epoch,
715                                        backfill_stage.total_backfilled_rows(),
716                                    );
717
718                                    let barrier_epoch = barrier.epoch;
719                                    // yield barrier after reporting progress
720                                    yield Message::Barrier(barrier);
721
722                                    if let Some(to_apply_mutation) = maybe_muatation {
723                                        if self
724                                            .apply_split_change_after_yield_barrier(
725                                                barrier_epoch,
726                                                &mut backfill_stage,
727                                                to_apply_mutation,
728                                            )
729                                            .await?
730                                        {
731                                            let reader = rebuild_reader_on_split_changed(
732                                                &self,
733                                                &backfill_stage,
734                                                &source_desc,
735                                            )
736                                            .await?;
737
738                                            backfill_stream = select_with_strategy(
739                                                input.by_ref().map(Either::Left),
740                                                reader.map(Either::Right),
741                                                select_strategy,
742                                            );
743                                        }
744                                    }
745                                }
746                            }
747                            Message::Chunk(chunk) => {
748                                // We need to iterate over all rows because there might be multiple splits in a chunk.
749                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
750                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
751
752                                for (i, (_, row)) in chunk.rows().enumerate() {
753                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
754                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
755                                    let vis = backfill_stage.handle_upstream_row(split, offset);
756                                    new_vis.set(i, vis);
757                                }
758                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
759                                let new_vis = new_vis.finish();
760                                if new_vis.count_ones() != 0 {
761                                    let new_chunk = chunk.clone_with_vis(new_vis);
762                                    yield Message::Chunk(new_chunk);
763                                }
764                            }
765                            Message::Watermark(_) => {
766                                // Ignore watermark during backfill.
767                            }
768                        }
769                    }
770                    // backfill
771                    Either::Right(msg) => {
772                        let chunk = msg?;
773
774                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
775                            // Pause to let barrier catch up via backpressure of snapshot stream.
776                            pause_control.self_pause();
777                            pause_reader!();
778
779                            // Exceeds the max wait barrier time, the source will be paused.
780                            // Currently we can guarantee the
781                            // source is not paused since it received stream
782                            // chunks.
783                            tracing::warn!(
784                                "source {} paused, wait barrier for {:?}",
785                                self.info.identity,
786                                last_barrier_time.elapsed()
787                            );
788
789                            // Only update `max_wait_barrier_time_ms` to capture
790                            // `barrier_interval_ms`
791                            // changes here to avoid frequently accessing the shared
792                            // `system_params`.
793                            max_wait_barrier_time_ms =
794                                self.system_params.load().barrier_interval_ms() as u128
795                                    * WAIT_BARRIER_MULTIPLE_TIMES;
796                        }
797                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
798
799                        for (i, (_, row)) in chunk.rows().enumerate() {
800                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
801                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
802                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
803                            new_vis.set(i, vis);
804                        }
805
806                        let new_vis = new_vis.finish();
807                        let card = new_vis.count_ones();
808                        if card != 0 {
809                            let new_chunk = chunk.clone_with_vis(new_vis);
810                            yield Message::Chunk(new_chunk);
811                            source_backfill_row_count.inc_by(card as u64);
812                        }
813                    }
814                }
815            }
816        }
817
818        std::mem::drop(backfill_stream);
819        let mut states = backfill_stage.states;
820        // Make sure `Finished` state is persisted.
821        self.backfill_state_store.set_states(states.clone()).await?;
822
823        // All splits finished backfilling. Now we only forward the source data.
824        #[for_await]
825        for msg in input {
826            let msg = msg?;
827            match msg {
828                Message::Barrier(barrier) => {
829                    let mut split_changed = None;
830                    if let Some(ref mutation) = barrier.mutation.as_deref() {
831                        match mutation {
832                            Mutation::Pause | Mutation::Resume => {
833                                // We don't need to do anything. Handled by upstream.
834                            }
835                            Mutation::SourceChangeSplit(actor_splits) => {
836                                tracing::info!(
837                                    actor_splits = ?actor_splits,
838                                    "source change split received"
839                                );
840                                split_changed = actor_splits
841                                    .get(&self.actor_ctx.id)
842                                    .cloned()
843                                    .map(|target_splits| (target_splits, &mut states, true));
844                            }
845                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
846                                split_changed = actor_splits
847                                    .get(&self.actor_ctx.id)
848                                    .cloned()
849                                    .map(|target_splits| (target_splits, &mut states, false));
850                            }
851                            _ => {}
852                        }
853                    }
854                    self.backfill_state_store.commit(barrier.epoch).await?;
855                    let barrier_epoch = barrier.epoch;
856                    yield Message::Barrier(barrier);
857
858                    if let Some((target_splits, state, should_trim_state)) = split_changed {
859                        self.apply_split_change_forward_stage_after_yield_barrier(
860                            barrier_epoch,
861                            target_splits,
862                            state,
863                            should_trim_state,
864                        )
865                        .await?;
866                    }
867                }
868                Message::Chunk(chunk) => {
869                    yield Message::Chunk(chunk);
870                }
871                Message::Watermark(watermark) => {
872                    yield Message::Watermark(watermark);
873                }
874            }
875        }
876    }
877
878    /// When we should call `progress.finish()` to let blocking DDL return.
879    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
880    ///
881    /// Note: split migration (online scaling) is related with progress tracking.
882    /// - For foreground DDL, scaling is not allowed before progress is finished.
883    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
884    ///
885    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
886    fn should_report_finished(&self, states: &BackfillStates) -> bool {
887        states.values().all(|state| {
888            matches!(
889                state.state,
890                BackfillState::Finished | BackfillState::SourceCachingUp(_)
891            )
892        })
893    }
894
895    /// All splits entered `Finished` state.
896    ///
897    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
898    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
899    /// this actor, we still need to backfill it.
900    ///
901    /// Note: at the beginning, the actor will only read the state written by itself.
902    /// It needs to _wait until it can read all actors' written data_.
903    /// i.e., wait for the second checkpoint has been available.
904    ///
905    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
906    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
907        Ok(states
908            .values()
909            .all(|state| matches!(state.state, BackfillState::Finished))
910            && self
911                .backfill_state_store
912                .scan_may_stale()
913                .await?
914                .into_iter()
915                .all(|state| matches!(state.state, BackfillState::Finished)))
916    }
917
918    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
919    async fn apply_split_change_after_yield_barrier(
920        &mut self,
921        barrier_epoch: EpochPair,
922        stage: &mut BackfillStage,
923        to_apply_mutation: ApplyMutationAfterBarrier,
924    ) -> StreamExecutorResult<bool> {
925        match to_apply_mutation {
926            ApplyMutationAfterBarrier::SourceChangeSplit {
927                target_splits,
928                should_trim_state,
929            } => {
930                self.source_split_change_count.inc();
931                {
932                    if self
933                        .update_state_if_changed(
934                            barrier_epoch,
935                            target_splits,
936                            stage,
937                            should_trim_state,
938                        )
939                        .await?
940                    {
941                        // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
942                        Ok(true)
943                    } else {
944                        Ok(false)
945                    }
946                }
947            }
948            ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
949        }
950    }
951
952    /// Returns `true` if split changed. Otherwise `false`.
953    async fn update_state_if_changed(
954        &mut self,
955        barrier_epoch: EpochPair,
956        target_splits: Vec<SplitImpl>,
957        stage: &mut BackfillStage,
958        should_trim_state: bool,
959    ) -> StreamExecutorResult<bool> {
960        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
961
962        let mut split_changed = false;
963        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
964        // Will be set to target_state in the end.
965        let old_states = std::mem::take(&mut stage.states);
966        let committed_reader = self
967            .backfill_state_store
968            .new_committed_reader(barrier_epoch)
969            .await?;
970        // Iterate over the target (assigned) splits
971        // - check if any new splits are added
972        // - build target_state
973        for split in &target_splits {
974            let split_id = split.id();
975            if let Some(s) = old_states.get(&split_id) {
976                target_state.insert(split_id, s.clone());
977            } else {
978                split_changed = true;
979
980                let backfill_state = committed_reader
981                    .try_recover_from_state_store(&split_id)
982                    .await?;
983                match backfill_state {
984                    None => {
985                        // Newly added split. We don't need to backfill.
986                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
987                        target_state.insert(
988                            split_id,
989                            BackfillStateWithProgress {
990                                state: BackfillState::Finished,
991                                num_consumed_rows: 0,
992                                target_offset: None,
993                            },
994                        );
995                    }
996                    Some(backfill_state) => {
997                        // Migrated split. Backfill if unfinished.
998                        target_state.insert(split_id, backfill_state);
999                    }
1000                }
1001            }
1002        }
1003
1004        // Checks dropped splits
1005        for existing_split_id in old_states.keys() {
1006            if !target_state.contains_key(existing_split_id) {
1007                tracing::info!("split dropping detected: {}", existing_split_id);
1008                split_changed = true;
1009            }
1010        }
1011
1012        if split_changed {
1013            let dropped_splits = stage
1014                .states
1015                .extract_if(|split_id, _| !target_state.contains_key(split_id))
1016                .map(|(split_id, _)| split_id);
1017
1018            if should_trim_state {
1019                // trim dropped splits' state
1020                self.backfill_state_store.trim_state(dropped_splits).await?;
1021            }
1022            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1023        } else {
1024            debug_assert_eq!(old_states, target_state);
1025        }
1026        stage.states = target_state;
1027        stage.splits = target_splits;
1028        stage.debug_assert_consistent();
1029        Ok(split_changed)
1030    }
1031
1032    /// For split change during forward stage, all newly added splits should be already finished.
1033    // We just need to update the state store if necessary.
1034    async fn apply_split_change_forward_stage_after_yield_barrier(
1035        &mut self,
1036        barrier_epoch: EpochPair,
1037        target_splits: Vec<SplitImpl>,
1038        states: &mut BackfillStates,
1039        should_trim_state: bool,
1040    ) -> StreamExecutorResult<()> {
1041        self.source_split_change_count.inc();
1042        {
1043            self.update_state_if_changed_forward_stage(
1044                barrier_epoch,
1045                target_splits,
1046                states,
1047                should_trim_state,
1048            )
1049            .await?;
1050        }
1051
1052        Ok(())
1053    }
1054
1055    async fn update_state_if_changed_forward_stage(
1056        &mut self,
1057        barrier_epoch: EpochPair,
1058        target_splits: Vec<SplitImpl>,
1059        states: &mut BackfillStates,
1060        should_trim_state: bool,
1061    ) -> StreamExecutorResult<()> {
1062        let target_splits: HashSet<SplitId> = target_splits
1063            .into_iter()
1064            .map(|split| (split.id()))
1065            .collect();
1066
1067        let mut split_changed = false;
1068        let mut newly_added_splits = vec![];
1069
1070        let committed_reader = self
1071            .backfill_state_store
1072            .new_committed_reader(barrier_epoch)
1073            .await?;
1074
1075        // Checks added splits
1076        for split_id in &target_splits {
1077            if !states.contains_key(split_id) {
1078                split_changed = true;
1079
1080                let backfill_state = committed_reader
1081                    .try_recover_from_state_store(split_id)
1082                    .await?;
1083                match &backfill_state {
1084                    None => {
1085                        // Newly added split. We don't need to backfill!
1086                        newly_added_splits.push(split_id.clone());
1087                    }
1088                    Some(backfill_state) => {
1089                        // Migrated split. It should also be finished since we are in forwarding stage.
1090                        match backfill_state.state {
1091                            BackfillState::Finished => {}
1092                            _ => {
1093                                return Err(anyhow::anyhow!(
1094                                    "Unexpected backfill state: {:?}",
1095                                    backfill_state
1096                                )
1097                                .into());
1098                            }
1099                        }
1100                    }
1101                }
1102                states.insert(
1103                    split_id.clone(),
1104                    backfill_state.unwrap_or(BackfillStateWithProgress {
1105                        state: BackfillState::Finished,
1106                        num_consumed_rows: 0,
1107                        target_offset: None,
1108                    }),
1109                );
1110            }
1111        }
1112
1113        // Checks dropped splits
1114        for existing_split_id in states.keys() {
1115            if !target_splits.contains(existing_split_id) {
1116                tracing::info!("split dropping detected: {}", existing_split_id);
1117                split_changed = true;
1118            }
1119        }
1120
1121        if split_changed {
1122            tracing::info!(
1123                target_splits = ?target_splits,
1124                "apply split change"
1125            );
1126
1127            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1128
1129            if should_trim_state {
1130                // trim dropped splits' state
1131                self.backfill_state_store
1132                    .trim_state(dropped_splits.map(|(k, _v)| k))
1133                    .await?;
1134            }
1135
1136            // For migrated splits, and existing splits, we do not need to update
1137            // state store, but only for newly added splits.
1138            self.backfill_state_store
1139                .set_states(
1140                    newly_added_splits
1141                        .into_iter()
1142                        .map(|split_id| {
1143                            (
1144                                split_id,
1145                                BackfillStateWithProgress {
1146                                    state: BackfillState::Finished,
1147                                    num_consumed_rows: 0,
1148                                    target_offset: None,
1149                                },
1150                            )
1151                        })
1152                        .collect(),
1153                )
1154                .await?;
1155        }
1156
1157        Ok(())
1158    }
1159}
1160
1161fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1162    let a = a.parse::<i64>().unwrap();
1163    let b = b.parse::<i64>().unwrap();
1164    a.cmp(&b)
1165}
1166
1167impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1168    fn execute(self: Box<Self>) -> BoxedMessageStream {
1169        self.inner.execute(self.input).boxed()
1170    }
1171}
1172
1173impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1174    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1175        f.debug_struct("SourceBackfillExecutor")
1176            .field("source_id", &self.source_id)
1177            .field("column_ids", &self.column_ids)
1178            .field("pk_indices", &self.info.pk_indices)
1179            .finish()
1180    }
1181}
1182
1183struct PauseControl {
1184    // Paused due to backfill order control
1185    backfill_paused: bool,
1186    // Paused due to self-pause, e.g. let barrier catch up
1187    self_paused: bool,
1188    // Paused due to Pause command from meta, pause_on_next_bootstrap
1189    command_paused: bool,
1190    // reader paused
1191    reader_paused: bool,
1192}
1193
1194impl PauseControl {
1195    fn new() -> Self {
1196        Self {
1197            backfill_paused: false,
1198            self_paused: false,
1199            command_paused: false,
1200            reader_paused: false,
1201        }
1202    }
1203
1204    fn is_paused(&self) -> bool {
1205        self.backfill_paused || self.command_paused || self.self_paused
1206    }
1207
1208    /// returns whether we need to pause the reader.
1209    fn backfill_pause(&mut self) {
1210        if self.backfill_paused {
1211            tracing::warn!("backfill_pause invoked twice");
1212        }
1213        self.backfill_paused = true;
1214    }
1215
1216    /// returns whether we need to resume the reader.
1217    /// same precedence as command.
1218    fn backfill_resume(&mut self) -> bool {
1219        if !self.backfill_paused {
1220            tracing::warn!("backfill_resume invoked twice");
1221        }
1222        !self.command_paused
1223    }
1224
1225    /// returns whether we need to pause the reader.
1226    fn self_pause(&mut self) {
1227        assert!(
1228            !self.backfill_paused,
1229            "backfill stream should not be read when backfill_pause is set"
1230        );
1231        assert!(
1232            !self.command_paused,
1233            "backfill stream should not be read when command_pause is set"
1234        );
1235        if self.self_paused {
1236            tracing::warn!("self_pause invoked twice");
1237        }
1238        self.self_paused = true;
1239    }
1240
1241    /// returns whether we need to resume the reader.
1242    /// `self_resume` has the lowest precedence,
1243    /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1244    fn self_resume(&mut self) -> bool {
1245        self.self_paused = false;
1246        !(self.backfill_paused || self.command_paused)
1247    }
1248
1249    /// returns whether we need to pause the reader.
1250    fn command_pause(&mut self) {
1251        if self.command_paused {
1252            tracing::warn!("command_pause invoked twice");
1253        }
1254        self.command_paused = true;
1255    }
1256
1257    /// returns whether we need to resume the reader.
1258    /// same precedence as backfill.
1259    fn command_resume(&mut self) -> bool {
1260        if !self.command_paused {
1261            tracing::warn!("command_resume invoked twice");
1262        }
1263        self.command_paused = false;
1264        !self.backfill_paused
1265    }
1266}