risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32    BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33    SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51    /// `None` means not started yet. It's the initial state.
52    /// XXX: perhaps we can also set to low-watermark instead of `None`
53    Backfilling(Option<String>),
54    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55    SourceCachingUp(String),
56    Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64    pub state: BackfillState,
65    pub num_consumed_rows: u64,
66    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68    /// so that we can finish backfilling even when upstream doesn't emit any data.
69    pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73    pub fn encode_to_json(self) -> JsonbVal {
74        serde_json::to_value(self).unwrap().into()
75    }
76
77    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79    }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83    pub inner: SourceBackfillExecutorInner<S>,
84    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85    pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89    actor_ctx: ActorContextRef,
90    info: ExecutorInfo,
91
92    /// Streaming source for external
93    source_id: TableId,
94    source_name: String,
95    column_ids: Vec<ColumnId>,
96    source_desc_builder: Option<SourceDescBuilder>,
97    backfill_state_store: BackfillStateTableHandler<S>,
98
99    /// Metrics for monitor.
100    metrics: Arc<StreamingMetrics>,
101    source_split_change_count: LabelGuardedIntCounter,
102
103    // /// Receiver of barrier channel.
104    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105    /// System parameter reader to read barrier interval
106    system_params: SystemParamsReaderRef,
107
108    /// Rate limit in rows/s.
109    rate_limit_rps: Option<u32>,
110
111    progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121    states: BackfillStates,
122    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123    ///
124    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125    splits: Vec<SplitImpl>,
126}
127
128enum ApplyMutationAfterBarrier {
129    SourceChangeSplit {
130        target_splits: Vec<SplitImpl>,
131        should_trim_state: bool,
132    },
133    ConnectorPropsChange,
134}
135
136impl BackfillStage {
137    fn total_backfilled_rows(&self) -> u64 {
138        self.states.values().map(|s| s.num_consumed_rows).sum()
139    }
140
141    fn debug_assert_consistent(&self) {
142        if cfg!(debug_assertions) {
143            let all_splits: HashSet<_> =
144                self.splits.iter().map(|split| split.id().clone()).collect();
145            assert_eq!(
146                self.states.keys().cloned().collect::<HashSet<_>>(),
147                all_splits
148            );
149        }
150    }
151
152    /// Get unfinished splits with latest offsets according to the backfill states.
153    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
154        let mut unfinished_splits = Vec::new();
155        for split in &self.splits {
156            let state = &self.states.get(split.id().as_ref()).unwrap().state;
157            match state {
158                BackfillState::Backfilling(Some(offset)) => {
159                    let mut updated_split = split.clone();
160                    updated_split.update_in_place(offset.clone())?;
161                    unfinished_splits.push(updated_split);
162                }
163                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
164                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
165            }
166        }
167        Ok(unfinished_splits)
168    }
169
170    /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
171    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
172        let mut vis = false;
173        let state = self.states.get_mut(split_id).unwrap();
174        let state_inner = &mut state.state;
175        match state_inner {
176            BackfillState::Backfilling(None) => {
177                // backfilling for this split is not started yet. Ignore this row
178            }
179            BackfillState::Backfilling(Some(backfill_offset)) => {
180                match compare_kafka_offset(backfill_offset, offset) {
181                    Ordering::Less => {
182                        // continue backfilling. Ignore this row
183                    }
184                    Ordering::Equal => {
185                        // backfilling for this split is finished just right.
186                        *state_inner = BackfillState::Finished;
187                    }
188                    Ordering::Greater => {
189                        // backfilling for this split produced more data than current source's progress.
190                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
191                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
192                    }
193                }
194            }
195            BackfillState::SourceCachingUp(backfill_offset) => {
196                match compare_kafka_offset(backfill_offset, offset) {
197                    Ordering::Less => {
198                        // Source caught up, but doesn't contain the last backfilled row.
199                        // This may happen e.g., if Kafka performed compaction.
200                        vis = true;
201                        *state_inner = BackfillState::Finished;
202                    }
203                    Ordering::Equal => {
204                        // Source just caught up with backfilling.
205                        *state_inner = BackfillState::Finished;
206                    }
207                    Ordering::Greater => {
208                        // Source is still behind backfilling.
209                    }
210                }
211            }
212            BackfillState::Finished => {
213                vis = true;
214                // This split's backfilling is finished, we are waiting for other splits
215            }
216        }
217        if matches!(state_inner, BackfillState::Backfilling(_)) {
218            state.target_offset = Some(offset.to_owned());
219        }
220        if vis {
221            debug_assert_eq!(*state_inner, BackfillState::Finished);
222        }
223        vis
224    }
225
226    /// Updates backfill states and returns whether the row backfilled from external system is visible.
227    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
228        let state = self.states.get_mut(split_id).unwrap();
229        state.num_consumed_rows += 1;
230        let state_inner = &mut state.state;
231        match state_inner {
232            BackfillState::Backfilling(_old_offset) => {
233                if let Some(target_offset) = &state.target_offset
234                    && compare_kafka_offset(offset, target_offset).is_ge()
235                {
236                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
237                    // and dropping duplicated messages.
238                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
239                    // In this case, upstream hasn't reached the target_offset yet.
240                    //
241                    // Note2: after this, all following rows in the current chunk will be invisible.
242                    //
243                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
244                    // keep backfilling.
245                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
246                } else {
247                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
248                }
249                true
250            }
251            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
252                // backfilling stopped. ignore
253                false
254            }
255        }
256    }
257}
258
259impl<S: StateStore> SourceBackfillExecutorInner<S> {
260    #[expect(clippy::too_many_arguments)]
261    pub fn new(
262        actor_ctx: ActorContextRef,
263        info: ExecutorInfo,
264        stream_source_core: StreamSourceCore<S>,
265        metrics: Arc<StreamingMetrics>,
266        system_params: SystemParamsReaderRef,
267        backfill_state_store: BackfillStateTableHandler<S>,
268        rate_limit_rps: Option<u32>,
269        progress: CreateMviewProgressReporter,
270    ) -> Self {
271        let source_split_change_count = metrics
272            .source_split_change_count
273            .with_guarded_label_values(&[
274                &stream_source_core.source_id.to_string(),
275                &stream_source_core.source_name,
276                &actor_ctx.id.to_string(),
277                &actor_ctx.fragment_id.to_string(),
278            ]);
279
280        Self {
281            actor_ctx,
282            info,
283            source_id: stream_source_core.source_id,
284            source_name: stream_source_core.source_name,
285            column_ids: stream_source_core.column_ids,
286            source_desc_builder: stream_source_core.source_desc_builder,
287            backfill_state_store,
288            metrics,
289            source_split_change_count,
290            system_params,
291            rate_limit_rps,
292            progress,
293        }
294    }
295
296    async fn build_stream_source_reader(
297        &self,
298        source_desc: &SourceDesc,
299        splits: Vec<SplitImpl>,
300    ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
301        let column_ids = source_desc
302            .columns
303            .iter()
304            .map(|column_desc| column_desc.column_id)
305            .collect_vec();
306        let source_ctx = SourceContext::new(
307            self.actor_ctx.id,
308            self.source_id,
309            self.actor_ctx.fragment_id,
310            self.source_name.clone(),
311            source_desc.metrics.clone(),
312            SourceCtrlOpts {
313                chunk_size: limited_chunk_size(self.rate_limit_rps),
314                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
315            },
316            source_desc.source.config.clone(),
317            None,
318        );
319
320        // We will check watermark to decide whether we need to backfill.
321        // e.g., when there's a Kafka topic-partition without any data,
322        // we don't need to backfill at all. But if we do not check here,
323        // the executor can only know it's finished when data coming in.
324        // For blocking DDL, this would be annoying.
325
326        let (stream, res) = source_desc
327            .source
328            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
329            .await
330            .map_err(StreamExecutorError::connector_error)?;
331        Ok((
332            apply_rate_limit(stream, self.rate_limit_rps).boxed(),
333            res.backfill_info,
334        ))
335    }
336
337    #[try_stream(ok = Message, error = StreamExecutorError)]
338    async fn execute(mut self, input: Executor) {
339        let mut input = input.execute();
340
341        // Poll the upstream to get the first barrier.
342        let barrier = expect_first_barrier(&mut input).await?;
343        let first_epoch = barrier.epoch;
344        let owned_splits = barrier
345            .initial_split_assignment(self.actor_ctx.id)
346            .unwrap_or(&[])
347            .to_vec();
348
349        let mut pause_control = PauseControl::new();
350        if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
351            pause_control.backfill_pause();
352        }
353        if barrier.is_pause_on_startup() {
354            pause_control.command_pause();
355        }
356        yield Message::Barrier(barrier);
357
358        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
359        let mut source_desc = source_desc_builder
360            .build()
361            .map_err(StreamExecutorError::connector_error)?;
362        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
363        else {
364            unreachable!("Partition and offset columns must be set.");
365        };
366
367        self.backfill_state_store.init_epoch(first_epoch).await?;
368
369        let mut backfill_states: BackfillStates = HashMap::new();
370        {
371            let committed_reader = self
372                .backfill_state_store
373                .new_committed_reader(first_epoch)
374                .await?;
375            for split in &owned_splits {
376                let split_id = split.id();
377                let backfill_state = committed_reader
378                    .try_recover_from_state_store(&split_id)
379                    .await?
380                    .unwrap_or(BackfillStateWithProgress {
381                        state: BackfillState::Backfilling(None),
382                        num_consumed_rows: 0,
383                        target_offset: None, // init with None
384                    });
385                backfill_states.insert(split_id, backfill_state);
386            }
387        }
388        let mut backfill_stage = BackfillStage {
389            states: backfill_states,
390            splits: owned_splits,
391        };
392        backfill_stage.debug_assert_consistent();
393
394        let (source_chunk_reader, backfill_info) = self
395            .build_stream_source_reader(
396                &source_desc,
397                backfill_stage.get_latest_unfinished_splits()?,
398            )
399            .instrument_await("source_build_reader")
400            .await?;
401        for (split_id, info) in &backfill_info {
402            let state = backfill_stage.states.get_mut(split_id).unwrap();
403            match info {
404                BackfillInfo::NoDataToBackfill => {
405                    state.state = BackfillState::Finished;
406                }
407                BackfillInfo::HasDataToBackfill { latest_offset } => {
408                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
409                    state.target_offset = Some(latest_offset.clone());
410                }
411            }
412        }
413        tracing::debug!(?backfill_stage, "source backfill started");
414
415        fn select_strategy(_: &mut ()) -> PollNext {
416            futures::stream::PollNext::Left
417        }
418
419        // We choose "preferring upstream" strategy here, because:
420        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
421        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
422        //   So possibly this can also affect other running jobs less.
423        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
424        let mut backfill_stream = select_with_strategy(
425            input.by_ref().map(Either::Left),
426            source_chunk_reader.map(Either::Right),
427            select_strategy,
428        );
429
430        type PausedReader = Option<impl Stream>;
431        let mut paused_reader: PausedReader = None;
432
433        macro_rules! pause_reader {
434            () => {
435                if !pause_control.reader_paused {
436                    let (left, right) = backfill_stream.into_inner();
437                    backfill_stream = select_with_strategy(
438                        left,
439                        futures::stream::pending().boxed().map(Either::Right),
440                        select_strategy,
441                    );
442                    // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
443                    paused_reader = Some(right);
444                    pause_control.reader_paused = true;
445                }
446            };
447        }
448
449        macro_rules! resume_reader {
450            () => {
451                if pause_control.reader_paused {
452                    backfill_stream = select_with_strategy(
453                        input.by_ref().map(Either::Left),
454                        paused_reader
455                            .take()
456                            .expect("should have paused reader to resume"),
457                        select_strategy,
458                    );
459                    pause_control.reader_paused = false;
460                }
461            };
462        }
463
464        if pause_control.is_paused() {
465            pause_reader!();
466        }
467
468        let state_store = self
469            .backfill_state_store
470            .state_store()
471            .state_store()
472            .clone();
473        let table_id = self.backfill_state_store.state_store().table_id().into();
474        let mut state_table_initialized = false;
475        {
476            let source_backfill_row_count = self
477                .metrics
478                .source_backfill_row_count
479                .with_guarded_label_values(&[
480                    &self.source_id.to_string(),
481                    &self.source_name,
482                    &self.actor_ctx.id.to_string(),
483                    &self.actor_ctx.fragment_id.to_string(),
484                ]);
485
486            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
487            // milliseconds, considering some other latencies like network and cost in Meta.
488            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
489                as u128
490                * WAIT_BARRIER_MULTIPLE_TIMES;
491            let mut last_barrier_time = Instant::now();
492
493            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
494            'backfill_loop: while let Some(either) = backfill_stream.next().await {
495                match either {
496                    // Upstream
497                    Either::Left(msg) => {
498                        let Ok(msg) = msg else {
499                            let e = msg.unwrap_err();
500                            tracing::warn!(
501                                error = ?e.as_report(),
502                                source_id = %self.source_id,
503                                "stream source reader error",
504                            );
505                            GLOBAL_ERROR_METRICS.user_source_error.report([
506                                "SourceReaderError".to_owned(),
507                                self.source_id.to_string(),
508                                self.source_name.to_owned(),
509                                self.actor_ctx.fragment_id.to_string(),
510                            ]);
511
512                            let (reader, _backfill_info) = self
513                                .build_stream_source_reader(
514                                    &source_desc,
515                                    backfill_stage.get_latest_unfinished_splits()?,
516                                )
517                                .await?;
518
519                            backfill_stream = select_with_strategy(
520                                input.by_ref().map(Either::Left),
521                                reader.map(Either::Right),
522                                select_strategy,
523                            );
524                            continue;
525                        };
526                        match msg {
527                            Message::Barrier(barrier) => {
528                                last_barrier_time = Instant::now();
529
530                                if pause_control.self_resume() {
531                                    resume_reader!();
532                                }
533
534                                let mut maybe_muatation = None;
535                                if let Some(ref mutation) = barrier.mutation.as_deref() {
536                                    match mutation {
537                                        Mutation::Pause => {
538                                            // pause_reader should not be invoked consecutively more than once.
539                                            pause_control.command_pause();
540                                            pause_reader!();
541                                        }
542                                        Mutation::Resume => {
543                                            // pause_reader.take should not be invoked consecutively more than once.
544                                            if pause_control.command_resume() {
545                                                resume_reader!();
546                                            }
547                                        }
548                                        Mutation::StartFragmentBackfill { fragment_ids } => {
549                                            if fragment_ids.contains(&self.actor_ctx.fragment_id)
550                                                && pause_control.backfill_resume()
551                                            {
552                                                resume_reader!();
553                                            }
554                                        }
555                                        Mutation::SourceChangeSplit(actor_splits) => {
556                                            tracing::info!(
557                                                actor_splits = ?actor_splits,
558                                                "source change split received"
559                                            );
560                                            maybe_muatation = actor_splits
561                                                .get(&self.actor_ctx.id)
562                                                .cloned()
563                                                .map(|target_splits| {
564                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
565                                                        target_splits,
566                                                        should_trim_state: true,
567                                                    }
568                                                });
569                                        }
570                                        Mutation::Update(UpdateMutation {
571                                            actor_splits, ..
572                                        }) => {
573                                            maybe_muatation = actor_splits
574                                                .get(&self.actor_ctx.id)
575                                                .cloned()
576                                                .map(|target_splits| {
577                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
578                                                        target_splits,
579                                                        should_trim_state: false,
580                                                    }
581                                                });
582                                        }
583                                        Mutation::ConnectorPropsChange(maybe_mutation) => {
584                                            if let Some(props_plaintext) =
585                                                maybe_mutation.get(&self.source_id.table_id())
586                                            {
587                                                source_desc
588                                                    .update_reader(props_plaintext.clone())?;
589
590                                                maybe_muatation = Some(
591                                                    ApplyMutationAfterBarrier::ConnectorPropsChange,
592                                                );
593                                            }
594                                        }
595                                        Mutation::Throttle(actor_to_apply) => {
596                                            if let Some(new_rate_limit) =
597                                                actor_to_apply.get(&self.actor_ctx.id)
598                                                && *new_rate_limit != self.rate_limit_rps
599                                            {
600                                                tracing::info!(
601                                                    "updating rate limit from {:?} to {:?}",
602                                                    self.rate_limit_rps,
603                                                    *new_rate_limit
604                                                );
605                                                self.rate_limit_rps = *new_rate_limit;
606                                                // rebuild reader
607                                                let (reader, _backfill_info) = self
608                                                    .build_stream_source_reader(
609                                                        &source_desc,
610                                                        backfill_stage
611                                                            .get_latest_unfinished_splits()?,
612                                                    )
613                                                    .await?;
614
615                                                backfill_stream = select_with_strategy(
616                                                    input.by_ref().map(Either::Left),
617                                                    reader.map(Either::Right),
618                                                    select_strategy,
619                                                );
620                                            }
621                                        }
622                                        _ => {}
623                                    }
624                                }
625                                async fn rebuild_reader_on_split_changed(
626                                    this: &SourceBackfillExecutorInner<impl StateStore>,
627                                    backfill_stage: &BackfillStage,
628                                    source_desc: &SourceDesc,
629                                ) -> StreamExecutorResult<BoxSourceChunkStream>
630                                {
631                                    // rebuild backfill_stream
632                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
633
634                                    let latest_unfinished_splits =
635                                        backfill_stage.get_latest_unfinished_splits()?;
636                                    tracing::info!(
637                                        "actor {:?} apply source split change to {:?}",
638                                        this.actor_ctx.id,
639                                        latest_unfinished_splits
640                                    );
641
642                                    // Replace the source reader with a new one of the new state.
643                                    let (reader, _backfill_info) = this
644                                        .build_stream_source_reader(
645                                            source_desc,
646                                            latest_unfinished_splits,
647                                        )
648                                        .await?;
649
650                                    Ok(reader)
651                                }
652
653                                self.backfill_state_store
654                                    .set_states(backfill_stage.states.clone())
655                                    .await?;
656                                self.backfill_state_store.commit(barrier.epoch).await?;
657
658                                if self.should_report_finished(&backfill_stage.states) {
659                                    // drop the backfill kafka consumers
660                                    backfill_stream = select_with_strategy(
661                                        input.by_ref().map(Either::Left),
662                                        futures::stream::pending().boxed().map(Either::Right),
663                                        select_strategy,
664                                    );
665
666                                    self.progress.finish(
667                                        barrier.epoch,
668                                        backfill_stage.total_backfilled_rows(),
669                                    );
670
671                                    let barrier_epoch = barrier.epoch;
672                                    let is_checkpoint = barrier.is_checkpoint();
673                                    // yield barrier after reporting progress
674                                    yield Message::Barrier(barrier);
675
676                                    if let Some(to_apply_mutation) = maybe_muatation {
677                                        self.apply_split_change_after_yield_barrier(
678                                            barrier_epoch,
679                                            &mut backfill_stage,
680                                            to_apply_mutation,
681                                        )
682                                        .await?;
683                                    }
684
685                                    if !state_table_initialized {
686                                        if is_checkpoint {
687                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
688                                            // We wait for 2nd epoch
689                                            let epoch = barrier_epoch.prev;
690                                            tracing::info!("waiting for epoch: {}", epoch);
691                                            state_store
692                                                .try_wait_epoch(
693                                                    HummockReadEpoch::Committed(epoch),
694                                                    TryWaitEpochOptions { table_id },
695                                                )
696                                                .await?;
697                                            tracing::info!("finished waiting for epoch: {}", epoch);
698                                            state_table_initialized = true;
699                                        }
700                                    } else {
701                                        // After we reported finished, we still don't exit the loop.
702                                        // Because we need to handle split migration.
703                                        assert!(
704                                            state_table_initialized,
705                                            "state table should be initialized before checking backfill finished"
706                                        );
707                                        if self.backfill_finished(&backfill_stage.states).await? {
708                                            tracing::info!("source backfill finished");
709                                            break 'backfill_loop;
710                                        }
711                                    }
712                                } else {
713                                    self.progress.update_for_source_backfill(
714                                        barrier.epoch,
715                                        backfill_stage.total_backfilled_rows(),
716                                    );
717
718                                    let barrier_epoch = barrier.epoch;
719                                    // yield barrier after reporting progress
720                                    yield Message::Barrier(barrier);
721
722                                    if let Some(to_apply_mutation) = maybe_muatation
723                                        && self
724                                            .apply_split_change_after_yield_barrier(
725                                                barrier_epoch,
726                                                &mut backfill_stage,
727                                                to_apply_mutation,
728                                            )
729                                            .await?
730                                    {
731                                        let reader = rebuild_reader_on_split_changed(
732                                            &self,
733                                            &backfill_stage,
734                                            &source_desc,
735                                        )
736                                        .await?;
737
738                                        backfill_stream = select_with_strategy(
739                                            input.by_ref().map(Either::Left),
740                                            reader.map(Either::Right),
741                                            select_strategy,
742                                        );
743                                    }
744                                }
745                            }
746                            Message::Chunk(chunk) => {
747                                // We need to iterate over all rows because there might be multiple splits in a chunk.
748                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
749                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
750
751                                for (i, (_, row)) in chunk.rows().enumerate() {
752                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
753                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
754                                    let vis = backfill_stage.handle_upstream_row(split, offset);
755                                    new_vis.set(i, vis);
756                                }
757                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
758                                let new_vis = new_vis.finish();
759                                if new_vis.count_ones() != 0 {
760                                    let new_chunk = chunk.clone_with_vis(new_vis);
761                                    yield Message::Chunk(new_chunk);
762                                }
763                            }
764                            Message::Watermark(_) => {
765                                // Ignore watermark during backfill.
766                            }
767                        }
768                    }
769                    // backfill
770                    Either::Right(msg) => {
771                        let chunk = msg?;
772
773                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
774                            // Pause to let barrier catch up via backpressure of snapshot stream.
775                            pause_control.self_pause();
776                            pause_reader!();
777
778                            // Exceeds the max wait barrier time, the source will be paused.
779                            // Currently we can guarantee the
780                            // source is not paused since it received stream
781                            // chunks.
782                            tracing::warn!(
783                                "source {} paused, wait barrier for {:?}",
784                                self.info.identity,
785                                last_barrier_time.elapsed()
786                            );
787
788                            // Only update `max_wait_barrier_time_ms` to capture
789                            // `barrier_interval_ms`
790                            // changes here to avoid frequently accessing the shared
791                            // `system_params`.
792                            max_wait_barrier_time_ms =
793                                self.system_params.load().barrier_interval_ms() as u128
794                                    * WAIT_BARRIER_MULTIPLE_TIMES;
795                        }
796                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
797
798                        for (i, (_, row)) in chunk.rows().enumerate() {
799                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
800                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
801                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
802                            new_vis.set(i, vis);
803                        }
804
805                        let new_vis = new_vis.finish();
806                        let card = new_vis.count_ones();
807                        if card != 0 {
808                            let new_chunk = chunk.clone_with_vis(new_vis);
809                            yield Message::Chunk(new_chunk);
810                            source_backfill_row_count.inc_by(card as u64);
811                        }
812                    }
813                }
814            }
815        }
816
817        std::mem::drop(backfill_stream);
818        let mut states = backfill_stage.states;
819        // Make sure `Finished` state is persisted.
820        self.backfill_state_store.set_states(states.clone()).await?;
821
822        // All splits finished backfilling. Now we only forward the source data.
823        #[for_await]
824        for msg in input {
825            let msg = msg?;
826            match msg {
827                Message::Barrier(barrier) => {
828                    let mut split_changed = None;
829                    if let Some(ref mutation) = barrier.mutation.as_deref() {
830                        match mutation {
831                            Mutation::Pause | Mutation::Resume => {
832                                // We don't need to do anything. Handled by upstream.
833                            }
834                            Mutation::SourceChangeSplit(actor_splits) => {
835                                tracing::info!(
836                                    actor_splits = ?actor_splits,
837                                    "source change split received"
838                                );
839                                split_changed = actor_splits
840                                    .get(&self.actor_ctx.id)
841                                    .cloned()
842                                    .map(|target_splits| (target_splits, &mut states, true));
843                            }
844                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
845                                split_changed = actor_splits
846                                    .get(&self.actor_ctx.id)
847                                    .cloned()
848                                    .map(|target_splits| (target_splits, &mut states, false));
849                            }
850                            _ => {}
851                        }
852                    }
853                    self.backfill_state_store.commit(barrier.epoch).await?;
854                    let barrier_epoch = barrier.epoch;
855                    yield Message::Barrier(barrier);
856
857                    if let Some((target_splits, state, should_trim_state)) = split_changed {
858                        self.apply_split_change_forward_stage_after_yield_barrier(
859                            barrier_epoch,
860                            target_splits,
861                            state,
862                            should_trim_state,
863                        )
864                        .await?;
865                    }
866                }
867                Message::Chunk(chunk) => {
868                    yield Message::Chunk(chunk);
869                }
870                Message::Watermark(watermark) => {
871                    yield Message::Watermark(watermark);
872                }
873            }
874        }
875    }
876
877    /// When we should call `progress.finish()` to let blocking DDL return.
878    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
879    ///
880    /// Note: split migration (online scaling) is related with progress tracking.
881    /// - For foreground DDL, scaling is not allowed before progress is finished.
882    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
883    ///
884    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
885    fn should_report_finished(&self, states: &BackfillStates) -> bool {
886        states.values().all(|state| {
887            matches!(
888                state.state,
889                BackfillState::Finished | BackfillState::SourceCachingUp(_)
890            )
891        })
892    }
893
894    /// All splits entered `Finished` state.
895    ///
896    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
897    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
898    /// this actor, we still need to backfill it.
899    ///
900    /// Note: at the beginning, the actor will only read the state written by itself.
901    /// It needs to _wait until it can read all actors' written data_.
902    /// i.e., wait for the second checkpoint has been available.
903    ///
904    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
905    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
906        Ok(states
907            .values()
908            .all(|state| matches!(state.state, BackfillState::Finished))
909            && self
910                .backfill_state_store
911                .scan_may_stale()
912                .await?
913                .into_iter()
914                .all(|state| matches!(state.state, BackfillState::Finished)))
915    }
916
917    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
918    async fn apply_split_change_after_yield_barrier(
919        &mut self,
920        barrier_epoch: EpochPair,
921        stage: &mut BackfillStage,
922        to_apply_mutation: ApplyMutationAfterBarrier,
923    ) -> StreamExecutorResult<bool> {
924        match to_apply_mutation {
925            ApplyMutationAfterBarrier::SourceChangeSplit {
926                target_splits,
927                should_trim_state,
928            } => {
929                self.source_split_change_count.inc();
930                {
931                    if self
932                        .update_state_if_changed(
933                            barrier_epoch,
934                            target_splits,
935                            stage,
936                            should_trim_state,
937                        )
938                        .await?
939                    {
940                        // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
941                        Ok(true)
942                    } else {
943                        Ok(false)
944                    }
945                }
946            }
947            ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
948        }
949    }
950
951    /// Returns `true` if split changed. Otherwise `false`.
952    async fn update_state_if_changed(
953        &mut self,
954        barrier_epoch: EpochPair,
955        target_splits: Vec<SplitImpl>,
956        stage: &mut BackfillStage,
957        should_trim_state: bool,
958    ) -> StreamExecutorResult<bool> {
959        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
960
961        let mut split_changed = false;
962        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
963        // Will be set to target_state in the end.
964        let old_states = std::mem::take(&mut stage.states);
965        let committed_reader = self
966            .backfill_state_store
967            .new_committed_reader(barrier_epoch)
968            .await?;
969        // Iterate over the target (assigned) splits
970        // - check if any new splits are added
971        // - build target_state
972        for split in &target_splits {
973            let split_id = split.id();
974            if let Some(s) = old_states.get(&split_id) {
975                target_state.insert(split_id, s.clone());
976            } else {
977                split_changed = true;
978
979                let backfill_state = committed_reader
980                    .try_recover_from_state_store(&split_id)
981                    .await?;
982                match backfill_state {
983                    None => {
984                        // Newly added split. We don't need to backfill.
985                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
986                        target_state.insert(
987                            split_id,
988                            BackfillStateWithProgress {
989                                state: BackfillState::Finished,
990                                num_consumed_rows: 0,
991                                target_offset: None,
992                            },
993                        );
994                    }
995                    Some(backfill_state) => {
996                        // Migrated split. Backfill if unfinished.
997                        target_state.insert(split_id, backfill_state);
998                    }
999                }
1000            }
1001        }
1002
1003        // Checks dropped splits
1004        for existing_split_id in old_states.keys() {
1005            if !target_state.contains_key(existing_split_id) {
1006                tracing::info!("split dropping detected: {}", existing_split_id);
1007                split_changed = true;
1008            }
1009        }
1010
1011        if split_changed {
1012            let dropped_splits = stage
1013                .states
1014                .extract_if(|split_id, _| !target_state.contains_key(split_id))
1015                .map(|(split_id, _)| split_id);
1016
1017            if should_trim_state {
1018                // trim dropped splits' state
1019                self.backfill_state_store.trim_state(dropped_splits).await?;
1020            }
1021            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1022        } else {
1023            debug_assert_eq!(old_states, target_state);
1024        }
1025        stage.states = target_state;
1026        stage.splits = target_splits;
1027        stage.debug_assert_consistent();
1028        Ok(split_changed)
1029    }
1030
1031    /// For split change during forward stage, all newly added splits should be already finished.
1032    // We just need to update the state store if necessary.
1033    async fn apply_split_change_forward_stage_after_yield_barrier(
1034        &mut self,
1035        barrier_epoch: EpochPair,
1036        target_splits: Vec<SplitImpl>,
1037        states: &mut BackfillStates,
1038        should_trim_state: bool,
1039    ) -> StreamExecutorResult<()> {
1040        self.source_split_change_count.inc();
1041        {
1042            self.update_state_if_changed_forward_stage(
1043                barrier_epoch,
1044                target_splits,
1045                states,
1046                should_trim_state,
1047            )
1048            .await?;
1049        }
1050
1051        Ok(())
1052    }
1053
1054    async fn update_state_if_changed_forward_stage(
1055        &mut self,
1056        barrier_epoch: EpochPair,
1057        target_splits: Vec<SplitImpl>,
1058        states: &mut BackfillStates,
1059        should_trim_state: bool,
1060    ) -> StreamExecutorResult<()> {
1061        let target_splits: HashSet<SplitId> = target_splits
1062            .into_iter()
1063            .map(|split| (split.id()))
1064            .collect();
1065
1066        let mut split_changed = false;
1067        let mut newly_added_splits = vec![];
1068
1069        let committed_reader = self
1070            .backfill_state_store
1071            .new_committed_reader(barrier_epoch)
1072            .await?;
1073
1074        // Checks added splits
1075        for split_id in &target_splits {
1076            if !states.contains_key(split_id) {
1077                split_changed = true;
1078
1079                let backfill_state = committed_reader
1080                    .try_recover_from_state_store(split_id)
1081                    .await?;
1082                match &backfill_state {
1083                    None => {
1084                        // Newly added split. We don't need to backfill!
1085                        newly_added_splits.push(split_id.clone());
1086                    }
1087                    Some(backfill_state) => {
1088                        // Migrated split. It should also be finished since we are in forwarding stage.
1089                        match backfill_state.state {
1090                            BackfillState::Finished => {}
1091                            _ => {
1092                                return Err(anyhow::anyhow!(
1093                                    "Unexpected backfill state: {:?}",
1094                                    backfill_state
1095                                )
1096                                .into());
1097                            }
1098                        }
1099                    }
1100                }
1101                states.insert(
1102                    split_id.clone(),
1103                    backfill_state.unwrap_or(BackfillStateWithProgress {
1104                        state: BackfillState::Finished,
1105                        num_consumed_rows: 0,
1106                        target_offset: None,
1107                    }),
1108                );
1109            }
1110        }
1111
1112        // Checks dropped splits
1113        for existing_split_id in states.keys() {
1114            if !target_splits.contains(existing_split_id) {
1115                tracing::info!("split dropping detected: {}", existing_split_id);
1116                split_changed = true;
1117            }
1118        }
1119
1120        if split_changed {
1121            tracing::info!(
1122                target_splits = ?target_splits,
1123                "apply split change"
1124            );
1125
1126            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1127
1128            if should_trim_state {
1129                // trim dropped splits' state
1130                self.backfill_state_store
1131                    .trim_state(dropped_splits.map(|(k, _v)| k))
1132                    .await?;
1133            }
1134
1135            // For migrated splits, and existing splits, we do not need to update
1136            // state store, but only for newly added splits.
1137            self.backfill_state_store
1138                .set_states(
1139                    newly_added_splits
1140                        .into_iter()
1141                        .map(|split_id| {
1142                            (
1143                                split_id,
1144                                BackfillStateWithProgress {
1145                                    state: BackfillState::Finished,
1146                                    num_consumed_rows: 0,
1147                                    target_offset: None,
1148                                },
1149                            )
1150                        })
1151                        .collect(),
1152                )
1153                .await?;
1154        }
1155
1156        Ok(())
1157    }
1158}
1159
1160fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1161    let a = a.parse::<i64>().unwrap();
1162    let b = b.parse::<i64>().unwrap();
1163    a.cmp(&b)
1164}
1165
1166impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1167    fn execute(self: Box<Self>) -> BoxedMessageStream {
1168        self.inner.execute(self.input).boxed()
1169    }
1170}
1171
1172impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1173    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1174        f.debug_struct("SourceBackfillExecutor")
1175            .field("source_id", &self.source_id)
1176            .field("column_ids", &self.column_ids)
1177            .field("pk_indices", &self.info.pk_indices)
1178            .finish()
1179    }
1180}
1181
1182struct PauseControl {
1183    // Paused due to backfill order control
1184    backfill_paused: bool,
1185    // Paused due to self-pause, e.g. let barrier catch up
1186    self_paused: bool,
1187    // Paused due to Pause command from meta, pause_on_next_bootstrap
1188    command_paused: bool,
1189    // reader paused
1190    reader_paused: bool,
1191}
1192
1193impl PauseControl {
1194    fn new() -> Self {
1195        Self {
1196            backfill_paused: false,
1197            self_paused: false,
1198            command_paused: false,
1199            reader_paused: false,
1200        }
1201    }
1202
1203    fn is_paused(&self) -> bool {
1204        self.backfill_paused || self.command_paused || self.self_paused
1205    }
1206
1207    /// returns whether we need to pause the reader.
1208    fn backfill_pause(&mut self) {
1209        if self.backfill_paused {
1210            tracing::warn!("backfill_pause invoked twice");
1211        }
1212        self.backfill_paused = true;
1213    }
1214
1215    /// returns whether we need to resume the reader.
1216    /// same precedence as command.
1217    fn backfill_resume(&mut self) -> bool {
1218        if !self.backfill_paused {
1219            tracing::warn!("backfill_resume invoked twice");
1220        }
1221        !self.command_paused
1222    }
1223
1224    /// returns whether we need to pause the reader.
1225    fn self_pause(&mut self) {
1226        assert!(
1227            !self.backfill_paused,
1228            "backfill stream should not be read when backfill_pause is set"
1229        );
1230        assert!(
1231            !self.command_paused,
1232            "backfill stream should not be read when command_pause is set"
1233        );
1234        if self.self_paused {
1235            tracing::warn!("self_pause invoked twice");
1236        }
1237        self.self_paused = true;
1238    }
1239
1240    /// returns whether we need to resume the reader.
1241    /// `self_resume` has the lowest precedence,
1242    /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1243    fn self_resume(&mut self) -> bool {
1244        self.self_paused = false;
1245        !(self.backfill_paused || self.command_paused)
1246    }
1247
1248    /// returns whether we need to pause the reader.
1249    fn command_pause(&mut self) {
1250        if self.command_paused {
1251            tracing::warn!("command_pause invoked twice");
1252        }
1253        self.command_paused = true;
1254    }
1255
1256    /// returns whether we need to resume the reader.
1257    /// same precedence as backfill.
1258    fn command_resume(&mut self) -> bool {
1259        if !self.command_paused {
1260            tracing::warn!("command_resume invoked twice");
1261        }
1262        self.command_paused = false;
1263        !self.backfill_paused
1264    }
1265}