risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32    BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33    SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51    /// `None` means not started yet. It's the initial state.
52    /// XXX: perhaps we can also set to low-watermark instead of `None`
53    Backfilling(Option<String>),
54    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55    SourceCachingUp(String),
56    Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64    pub state: BackfillState,
65    pub num_consumed_rows: u64,
66    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68    /// so that we can finish backfilling even when upstream doesn't emit any data.
69    pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73    pub fn encode_to_json(self) -> JsonbVal {
74        serde_json::to_value(self).unwrap().into()
75    }
76
77    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79    }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83    pub inner: SourceBackfillExecutorInner<S>,
84    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85    pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89    actor_ctx: ActorContextRef,
90    info: ExecutorInfo,
91
92    /// Streaming source for external
93    source_id: TableId,
94    source_name: String,
95    column_ids: Vec<ColumnId>,
96    source_desc_builder: Option<SourceDescBuilder>,
97    backfill_state_store: BackfillStateTableHandler<S>,
98
99    /// Metrics for monitor.
100    metrics: Arc<StreamingMetrics>,
101    source_split_change_count: LabelGuardedIntCounter<4>,
102
103    // /// Receiver of barrier channel.
104    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105    /// System parameter reader to read barrier interval
106    system_params: SystemParamsReaderRef,
107
108    /// Rate limit in rows/s.
109    rate_limit_rps: Option<u32>,
110
111    progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121    states: BackfillStates,
122    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123    ///
124    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125    splits: Vec<SplitImpl>,
126}
127
128impl BackfillStage {
129    fn total_backfilled_rows(&self) -> u64 {
130        self.states.values().map(|s| s.num_consumed_rows).sum()
131    }
132
133    fn debug_assert_consistent(&self) {
134        if cfg!(debug_assertions) {
135            let all_splits: HashSet<_> =
136                self.splits.iter().map(|split| split.id().clone()).collect();
137            assert_eq!(
138                self.states.keys().cloned().collect::<HashSet<_>>(),
139                all_splits
140            );
141        }
142    }
143
144    /// Get unfinished splits with latest offsets according to the backfill states.
145    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
146        let mut unfinished_splits = Vec::new();
147        for split in &self.splits {
148            let state = &self.states.get(split.id().as_ref()).unwrap().state;
149            match state {
150                BackfillState::Backfilling(Some(offset)) => {
151                    let mut updated_split = split.clone();
152                    updated_split.update_in_place(offset.clone())?;
153                    unfinished_splits.push(updated_split);
154                }
155                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
156                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
157            }
158        }
159        Ok(unfinished_splits)
160    }
161
162    /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
163    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
164        let mut vis = false;
165        let state = self.states.get_mut(split_id).unwrap();
166        let state_inner = &mut state.state;
167        match state_inner {
168            BackfillState::Backfilling(None) => {
169                // backfilling for this split is not started yet. Ignore this row
170            }
171            BackfillState::Backfilling(Some(backfill_offset)) => {
172                match compare_kafka_offset(backfill_offset, offset) {
173                    Ordering::Less => {
174                        // continue backfilling. Ignore this row
175                    }
176                    Ordering::Equal => {
177                        // backfilling for this split is finished just right.
178                        *state_inner = BackfillState::Finished;
179                    }
180                    Ordering::Greater => {
181                        // backfilling for this split produced more data than current source's progress.
182                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
183                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
184                    }
185                }
186            }
187            BackfillState::SourceCachingUp(backfill_offset) => {
188                match compare_kafka_offset(backfill_offset, offset) {
189                    Ordering::Less => {
190                        // Source caught up, but doesn't contain the last backfilled row.
191                        // This may happen e.g., if Kafka performed compaction.
192                        vis = true;
193                        *state_inner = BackfillState::Finished;
194                    }
195                    Ordering::Equal => {
196                        // Source just caught up with backfilling.
197                        *state_inner = BackfillState::Finished;
198                    }
199                    Ordering::Greater => {
200                        // Source is still behind backfilling.
201                    }
202                }
203            }
204            BackfillState::Finished => {
205                vis = true;
206                // This split's backfilling is finished, we are waiting for other splits
207            }
208        }
209        if matches!(state_inner, BackfillState::Backfilling(_)) {
210            state.target_offset = Some(offset.to_owned());
211        }
212        if vis {
213            debug_assert_eq!(*state_inner, BackfillState::Finished);
214        }
215        vis
216    }
217
218    /// Updates backfill states and returns whether the row backfilled from external system is visible.
219    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
220        let state = self.states.get_mut(split_id).unwrap();
221        state.num_consumed_rows += 1;
222        let state_inner = &mut state.state;
223        match state_inner {
224            BackfillState::Backfilling(_old_offset) => {
225                if let Some(target_offset) = &state.target_offset
226                    && compare_kafka_offset(offset, target_offset).is_ge()
227                {
228                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
229                    // and dropping duplicated messages.
230                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
231                    // In this case, upstream hasn't reached the target_offset yet.
232                    //
233                    // Note2: after this, all following rows in the current chunk will be invisible.
234                    //
235                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
236                    // keep backfilling.
237                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
238                } else {
239                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
240                }
241                true
242            }
243            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
244                // backfilling stopped. ignore
245                false
246            }
247        }
248    }
249}
250
251impl<S: StateStore> SourceBackfillExecutorInner<S> {
252    #[expect(clippy::too_many_arguments)]
253    pub fn new(
254        actor_ctx: ActorContextRef,
255        info: ExecutorInfo,
256        stream_source_core: StreamSourceCore<S>,
257        metrics: Arc<StreamingMetrics>,
258        system_params: SystemParamsReaderRef,
259        backfill_state_store: BackfillStateTableHandler<S>,
260        rate_limit_rps: Option<u32>,
261        progress: CreateMviewProgressReporter,
262    ) -> Self {
263        let source_split_change_count = metrics
264            .source_split_change_count
265            .with_guarded_label_values(&[
266                &stream_source_core.source_id.to_string(),
267                &stream_source_core.source_name,
268                &actor_ctx.id.to_string(),
269                &actor_ctx.fragment_id.to_string(),
270            ]);
271
272        Self {
273            actor_ctx,
274            info,
275            source_id: stream_source_core.source_id,
276            source_name: stream_source_core.source_name,
277            column_ids: stream_source_core.column_ids,
278            source_desc_builder: stream_source_core.source_desc_builder,
279            backfill_state_store,
280            metrics,
281            source_split_change_count,
282            system_params,
283            rate_limit_rps,
284            progress,
285        }
286    }
287
288    async fn build_stream_source_reader(
289        &self,
290        source_desc: &SourceDesc,
291        splits: Vec<SplitImpl>,
292    ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
293        let column_ids = source_desc
294            .columns
295            .iter()
296            .map(|column_desc| column_desc.column_id)
297            .collect_vec();
298        let source_ctx = SourceContext::new(
299            self.actor_ctx.id,
300            self.source_id,
301            self.actor_ctx.fragment_id,
302            self.source_name.clone(),
303            source_desc.metrics.clone(),
304            SourceCtrlOpts {
305                chunk_size: limited_chunk_size(self.rate_limit_rps),
306                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
307            },
308            source_desc.source.config.clone(),
309            None,
310        );
311
312        // We will check watermark to decide whether we need to backfill.
313        // e.g., when there's a Kafka topic-partition without any data,
314        // we don't need to backfill at all. But if we do not check here,
315        // the executor can only know it's finished when data coming in.
316        // For blocking DDL, this would be annoying.
317
318        let (stream, res) = source_desc
319            .source
320            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
321            .await
322            .map_err(StreamExecutorError::connector_error)?;
323        Ok((
324            apply_rate_limit(stream, self.rate_limit_rps).boxed(),
325            res.backfill_info,
326        ))
327    }
328
329    #[try_stream(ok = Message, error = StreamExecutorError)]
330    async fn execute(mut self, input: Executor) {
331        let mut input = input.execute();
332
333        // Poll the upstream to get the first barrier.
334        let barrier = expect_first_barrier(&mut input).await?;
335        let first_epoch = barrier.epoch;
336        let owned_splits = barrier
337            .initial_split_assignment(self.actor_ctx.id)
338            .unwrap_or(&[])
339            .to_vec();
340        let is_pause_on_startup = barrier.is_pause_on_startup();
341        yield Message::Barrier(barrier);
342
343        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
344        let source_desc = source_desc_builder
345            .build()
346            .map_err(StreamExecutorError::connector_error)?;
347        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
348        else {
349            unreachable!("Partition and offset columns must be set.");
350        };
351
352        self.backfill_state_store.init_epoch(first_epoch).await?;
353
354        let mut backfill_states: BackfillStates = HashMap::new();
355        {
356            let committed_reader = self
357                .backfill_state_store
358                .new_committed_reader(first_epoch)
359                .await?;
360            for split in &owned_splits {
361                let split_id = split.id();
362                let backfill_state = committed_reader
363                    .try_recover_from_state_store(&split_id)
364                    .await?
365                    .unwrap_or(BackfillStateWithProgress {
366                        state: BackfillState::Backfilling(None),
367                        num_consumed_rows: 0,
368                        target_offset: None, // init with None
369                    });
370                backfill_states.insert(split_id, backfill_state);
371            }
372        }
373        let mut backfill_stage = BackfillStage {
374            states: backfill_states,
375            splits: owned_splits,
376        };
377        backfill_stage.debug_assert_consistent();
378
379        let (source_chunk_reader, backfill_info) = self
380            .build_stream_source_reader(
381                &source_desc,
382                backfill_stage.get_latest_unfinished_splits()?,
383            )
384            .instrument_await("source_build_reader")
385            .await?;
386        for (split_id, info) in &backfill_info {
387            let state = backfill_stage.states.get_mut(split_id).unwrap();
388            match info {
389                BackfillInfo::NoDataToBackfill => {
390                    state.state = BackfillState::Finished;
391                }
392                BackfillInfo::HasDataToBackfill { latest_offset } => {
393                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
394                    state.target_offset = Some(latest_offset.clone());
395                }
396            }
397        }
398        tracing::debug!(?backfill_stage, "source backfill started");
399
400        fn select_strategy(_: &mut ()) -> PollNext {
401            futures::stream::PollNext::Left
402        }
403
404        // We choose "preferring upstream" strategy here, because:
405        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
406        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
407        //   So possibly this can also affect other running jobs less.
408        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
409        let mut backfill_stream = select_with_strategy(
410            input.by_ref().map(Either::Left),
411            source_chunk_reader.map(Either::Right),
412            select_strategy,
413        );
414
415        type PausedReader = Option<impl Stream>;
416        let mut paused_reader: PausedReader = None;
417        let mut command_paused = false;
418
419        macro_rules! pause_reader {
420            () => {
421                let (left, right) = backfill_stream.into_inner();
422                backfill_stream = select_with_strategy(
423                    left,
424                    futures::stream::pending().boxed().map(Either::Right),
425                    select_strategy,
426                );
427                // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
428                paused_reader = Some(right);
429            };
430        }
431
432        // If the first barrier requires us to pause on startup, pause the stream.
433        if is_pause_on_startup {
434            command_paused = true;
435            pause_reader!();
436        }
437
438        let state_store = self
439            .backfill_state_store
440            .state_store()
441            .state_store()
442            .clone();
443        let table_id = self.backfill_state_store.state_store().table_id().into();
444        let mut state_table_initialized = false;
445        {
446            let source_backfill_row_count = self
447                .metrics
448                .source_backfill_row_count
449                .with_guarded_label_values(&[
450                    &self.source_id.to_string(),
451                    &self.source_name,
452                    &self.actor_ctx.id.to_string(),
453                    &self.actor_ctx.fragment_id.to_string(),
454                ]);
455
456            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
457            // milliseconds, considering some other latencies like network and cost in Meta.
458            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
459                as u128
460                * WAIT_BARRIER_MULTIPLE_TIMES;
461            let mut last_barrier_time = Instant::now();
462            let mut self_paused = false;
463
464            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
465            'backfill_loop: while let Some(either) = backfill_stream.next().await {
466                match either {
467                    // Upstream
468                    Either::Left(msg) => {
469                        let Ok(msg) = msg else {
470                            let e = msg.unwrap_err();
471                            tracing::warn!(
472                                error = ?e.as_report(),
473                                source_id = %self.source_id,
474                                "stream source reader error",
475                            );
476                            GLOBAL_ERROR_METRICS.user_source_error.report([
477                                "SourceReaderError".to_owned(),
478                                self.source_id.to_string(),
479                                self.source_name.to_owned(),
480                                self.actor_ctx.fragment_id.to_string(),
481                            ]);
482
483                            let (reader, _backfill_info) = self
484                                .build_stream_source_reader(
485                                    &source_desc,
486                                    backfill_stage.get_latest_unfinished_splits()?,
487                                )
488                                .await?;
489
490                            backfill_stream = select_with_strategy(
491                                input.by_ref().map(Either::Left),
492                                reader.map(Either::Right),
493                                select_strategy,
494                            );
495                            continue;
496                        };
497                        match msg {
498                            Message::Barrier(barrier) => {
499                                last_barrier_time = Instant::now();
500
501                                if self_paused {
502                                    // command_paused has a higher priority.
503                                    if !command_paused {
504                                        backfill_stream = select_with_strategy(
505                                            input.by_ref().map(Either::Left),
506                                            paused_reader
507                                                .take()
508                                                .expect("no paused reader to resume"),
509                                            select_strategy,
510                                        );
511                                    }
512                                    self_paused = false;
513                                }
514
515                                let mut split_changed = None;
516                                if let Some(ref mutation) = barrier.mutation.as_deref() {
517                                    match mutation {
518                                        Mutation::Pause => {
519                                            // pause_reader should not be invoked consecutively more than once.
520                                            if !command_paused {
521                                                pause_reader!();
522                                                command_paused = true;
523                                            } else {
524                                                tracing::warn!(command_paused, "unexpected pause");
525                                            }
526                                        }
527                                        Mutation::Resume => {
528                                            // pause_reader.take should not be invoked consecutively more than once.
529                                            if command_paused {
530                                                backfill_stream = select_with_strategy(
531                                                    input.by_ref().map(Either::Left),
532                                                    paused_reader
533                                                        .take()
534                                                        .expect("no paused reader to resume"),
535                                                    select_strategy,
536                                                );
537                                                command_paused = false;
538                                            } else {
539                                                tracing::warn!(command_paused, "unexpected resume");
540                                            }
541                                        }
542                                        Mutation::SourceChangeSplit(actor_splits) => {
543                                            tracing::info!(
544                                                actor_splits = ?actor_splits,
545                                                "source change split received"
546                                            );
547                                            split_changed = actor_splits
548                                                .get(&self.actor_ctx.id)
549                                                .cloned()
550                                                .map(|target_splits| (target_splits, true));
551                                        }
552                                        Mutation::Update(UpdateMutation {
553                                            actor_splits, ..
554                                        }) => {
555                                            split_changed = actor_splits
556                                                .get(&self.actor_ctx.id)
557                                                .cloned()
558                                                .map(|target_splits| (target_splits, false));
559                                        }
560                                        Mutation::Throttle(actor_to_apply) => {
561                                            if let Some(new_rate_limit) =
562                                                actor_to_apply.get(&self.actor_ctx.id)
563                                                && *new_rate_limit != self.rate_limit_rps
564                                            {
565                                                tracing::info!(
566                                                    "updating rate limit from {:?} to {:?}",
567                                                    self.rate_limit_rps,
568                                                    *new_rate_limit
569                                                );
570                                                self.rate_limit_rps = *new_rate_limit;
571                                                // rebuild reader
572                                                let (reader, _backfill_info) = self
573                                                    .build_stream_source_reader(
574                                                        &source_desc,
575                                                        backfill_stage
576                                                            .get_latest_unfinished_splits()?,
577                                                    )
578                                                    .await?;
579
580                                                backfill_stream = select_with_strategy(
581                                                    input.by_ref().map(Either::Left),
582                                                    reader.map(Either::Right),
583                                                    select_strategy,
584                                                );
585                                            }
586                                        }
587                                        _ => {}
588                                    }
589                                }
590                                async fn rebuild_reader_on_split_changed(
591                                    this: &SourceBackfillExecutorInner<impl StateStore>,
592                                    backfill_stage: &BackfillStage,
593                                    source_desc: &SourceDesc,
594                                ) -> StreamExecutorResult<BoxSourceChunkStream>
595                                {
596                                    // rebuild backfill_stream
597                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
598
599                                    let latest_unfinished_splits =
600                                        backfill_stage.get_latest_unfinished_splits()?;
601                                    tracing::info!(
602                                        "actor {:?} apply source split change to {:?}",
603                                        this.actor_ctx.id,
604                                        latest_unfinished_splits
605                                    );
606
607                                    // Replace the source reader with a new one of the new state.
608                                    let (reader, _backfill_info) = this
609                                        .build_stream_source_reader(
610                                            source_desc,
611                                            latest_unfinished_splits,
612                                        )
613                                        .await?;
614
615                                    Ok(reader)
616                                }
617
618                                self.backfill_state_store
619                                    .set_states(backfill_stage.states.clone())
620                                    .await?;
621                                self.backfill_state_store.commit(barrier.epoch).await?;
622
623                                if self.should_report_finished(&backfill_stage.states) {
624                                    // drop the backfill kafka consumers
625                                    backfill_stream = select_with_strategy(
626                                        input.by_ref().map(Either::Left),
627                                        futures::stream::pending().boxed().map(Either::Right),
628                                        select_strategy,
629                                    );
630
631                                    self.progress.finish(
632                                        barrier.epoch,
633                                        backfill_stage.total_backfilled_rows(),
634                                    );
635
636                                    let barrier_epoch = barrier.epoch;
637                                    let is_checkpoint = barrier.is_checkpoint();
638                                    // yield barrier after reporting progress
639                                    yield Message::Barrier(barrier);
640
641                                    if let Some((target_splits, should_trim_state)) = split_changed
642                                    {
643                                        self.apply_split_change_after_yield_barrier(
644                                            barrier_epoch,
645                                            target_splits,
646                                            &mut backfill_stage,
647                                            should_trim_state,
648                                        )
649                                        .await?;
650                                    }
651
652                                    if !state_table_initialized {
653                                        if is_checkpoint {
654                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
655                                            // We wait for 2nd epoch
656                                            let epoch = barrier_epoch.prev;
657                                            tracing::info!("waiting for epoch: {}", epoch);
658                                            state_store
659                                                .try_wait_epoch(
660                                                    HummockReadEpoch::Committed(epoch),
661                                                    TryWaitEpochOptions { table_id },
662                                                )
663                                                .await?;
664                                            tracing::info!("finished waiting for epoch: {}", epoch);
665                                            state_table_initialized = true;
666                                        }
667                                    } else {
668                                        // After we reported finished, we still don't exit the loop.
669                                        // Because we need to handle split migration.
670                                        assert!(
671                                            state_table_initialized,
672                                            "state table should be initialized before checking backfill finished"
673                                        );
674                                        if self.backfill_finished(&backfill_stage.states).await? {
675                                            tracing::info!("source backfill finished");
676                                            break 'backfill_loop;
677                                        }
678                                    }
679                                } else {
680                                    self.progress.update_for_source_backfill(
681                                        barrier.epoch,
682                                        backfill_stage.total_backfilled_rows(),
683                                    );
684
685                                    let barrier_epoch = barrier.epoch;
686                                    // yield barrier after reporting progress
687                                    yield Message::Barrier(barrier);
688
689                                    if let Some((target_splits, should_trim_state)) = split_changed
690                                    {
691                                        if self
692                                            .apply_split_change_after_yield_barrier(
693                                                barrier_epoch,
694                                                target_splits,
695                                                &mut backfill_stage,
696                                                should_trim_state,
697                                            )
698                                            .await?
699                                        {
700                                            let reader = rebuild_reader_on_split_changed(
701                                                &self,
702                                                &backfill_stage,
703                                                &source_desc,
704                                            )
705                                            .await?;
706
707                                            backfill_stream = select_with_strategy(
708                                                input.by_ref().map(Either::Left),
709                                                reader.map(Either::Right),
710                                                select_strategy,
711                                            );
712                                        }
713                                    }
714                                }
715                            }
716                            Message::Chunk(chunk) => {
717                                // We need to iterate over all rows because there might be multiple splits in a chunk.
718                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
719                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
720
721                                for (i, (_, row)) in chunk.rows().enumerate() {
722                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
723                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
724                                    let vis = backfill_stage.handle_upstream_row(split, offset);
725                                    new_vis.set(i, vis);
726                                }
727                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
728                                let new_vis = new_vis.finish();
729                                if new_vis.count_ones() != 0 {
730                                    let new_chunk = chunk.clone_with_vis(new_vis);
731                                    yield Message::Chunk(new_chunk);
732                                }
733                            }
734                            Message::Watermark(_) => {
735                                // Ignore watermark during backfill.
736                            }
737                        }
738                    }
739                    // backfill
740                    Either::Right(msg) => {
741                        let chunk = msg?;
742
743                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
744                            assert!(!command_paused, "command_paused should be false");
745                            // pause_reader should not be invoked consecutively more than once.
746                            if !self_paused {
747                                pause_reader!();
748                            } else {
749                                tracing::warn!(self_paused, "unexpected self pause");
750                            }
751                            // Exceeds the max wait barrier time, the source will be paused.
752                            // Currently we can guarantee the
753                            // source is not paused since it received stream
754                            // chunks.
755                            self_paused = true;
756                            tracing::warn!(
757                                "source {} paused, wait barrier for {:?}",
758                                self.info.identity,
759                                last_barrier_time.elapsed()
760                            );
761
762                            // Only update `max_wait_barrier_time_ms` to capture
763                            // `barrier_interval_ms`
764                            // changes here to avoid frequently accessing the shared
765                            // `system_params`.
766                            max_wait_barrier_time_ms =
767                                self.system_params.load().barrier_interval_ms() as u128
768                                    * WAIT_BARRIER_MULTIPLE_TIMES;
769                        }
770                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
771
772                        for (i, (_, row)) in chunk.rows().enumerate() {
773                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
774                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
775                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
776                            new_vis.set(i, vis);
777                        }
778
779                        let new_vis = new_vis.finish();
780                        let card = new_vis.count_ones();
781                        if card != 0 {
782                            let new_chunk = chunk.clone_with_vis(new_vis);
783                            yield Message::Chunk(new_chunk);
784                            source_backfill_row_count.inc_by(card as u64);
785                        }
786                    }
787                }
788            }
789        }
790
791        std::mem::drop(backfill_stream);
792        let mut states = backfill_stage.states;
793        // Make sure `Finished` state is persisted.
794        self.backfill_state_store.set_states(states.clone()).await?;
795
796        // All splits finished backfilling. Now we only forward the source data.
797        #[for_await]
798        for msg in input {
799            let msg = msg?;
800            match msg {
801                Message::Barrier(barrier) => {
802                    let mut split_changed = None;
803                    if let Some(ref mutation) = barrier.mutation.as_deref() {
804                        match mutation {
805                            Mutation::Pause | Mutation::Resume => {
806                                // We don't need to do anything. Handled by upstream.
807                            }
808                            Mutation::SourceChangeSplit(actor_splits) => {
809                                tracing::info!(
810                                    actor_splits = ?actor_splits,
811                                    "source change split received"
812                                );
813                                split_changed = actor_splits
814                                    .get(&self.actor_ctx.id)
815                                    .cloned()
816                                    .map(|target_splits| (target_splits, &mut states, true));
817                            }
818                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
819                                split_changed = actor_splits
820                                    .get(&self.actor_ctx.id)
821                                    .cloned()
822                                    .map(|target_splits| (target_splits, &mut states, false));
823                            }
824                            _ => {}
825                        }
826                    }
827                    self.backfill_state_store.commit(barrier.epoch).await?;
828                    let barrier_epoch = barrier.epoch;
829                    yield Message::Barrier(barrier);
830
831                    if let Some((target_splits, state, should_trim_state)) = split_changed {
832                        self.apply_split_change_forward_stage_after_yield_barrier(
833                            barrier_epoch,
834                            target_splits,
835                            state,
836                            should_trim_state,
837                        )
838                        .await?;
839                    }
840                }
841                Message::Chunk(chunk) => {
842                    yield Message::Chunk(chunk);
843                }
844                Message::Watermark(watermark) => {
845                    yield Message::Watermark(watermark);
846                }
847            }
848        }
849    }
850
851    /// When we should call `progress.finish()` to let blocking DDL return.
852    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
853    ///
854    /// Note: split migration (online scaling) is related with progress tracking.
855    /// - For foreground DDL, scaling is not allowed before progress is finished.
856    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
857    ///
858    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
859    fn should_report_finished(&self, states: &BackfillStates) -> bool {
860        states.values().all(|state| {
861            matches!(
862                state.state,
863                BackfillState::Finished | BackfillState::SourceCachingUp(_)
864            )
865        })
866    }
867
868    /// All splits entered `Finished` state.
869    ///
870    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
871    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
872    /// this actor, we still need to backfill it.
873    ///
874    /// Note: at the beginning, the actor will only read the state written by itself.
875    /// It needs to _wait until it can read all actors' written data_.
876    /// i.e., wait for the second checkpoint has been available.
877    ///
878    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
879    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
880        Ok(states
881            .values()
882            .all(|state| matches!(state.state, BackfillState::Finished))
883            && self
884                .backfill_state_store
885                .scan_may_stale()
886                .await?
887                .into_iter()
888                .all(|state| matches!(state.state, BackfillState::Finished)))
889    }
890
891    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
892    async fn apply_split_change_after_yield_barrier(
893        &mut self,
894        barrier_epoch: EpochPair,
895        target_splits: Vec<SplitImpl>,
896        stage: &mut BackfillStage,
897        should_trim_state: bool,
898    ) -> StreamExecutorResult<bool> {
899        self.source_split_change_count.inc();
900        {
901            if self
902                .update_state_if_changed(barrier_epoch, target_splits, stage, should_trim_state)
903                .await?
904            {
905                // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
906                return Ok(true);
907            }
908        }
909
910        Ok(false)
911    }
912
913    /// Returns `true` if split changed. Otherwise `false`.
914    async fn update_state_if_changed(
915        &mut self,
916        barrier_epoch: EpochPair,
917        target_splits: Vec<SplitImpl>,
918        stage: &mut BackfillStage,
919        should_trim_state: bool,
920    ) -> StreamExecutorResult<bool> {
921        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
922
923        let mut split_changed = false;
924        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
925        // Will be set to target_state in the end.
926        let old_states = std::mem::take(&mut stage.states);
927        let committed_reader = self
928            .backfill_state_store
929            .new_committed_reader(barrier_epoch)
930            .await?;
931        // Iterate over the target (assigned) splits
932        // - check if any new splits are added
933        // - build target_state
934        for split in &target_splits {
935            let split_id = split.id();
936            if let Some(s) = old_states.get(&split_id) {
937                target_state.insert(split_id, s.clone());
938            } else {
939                split_changed = true;
940
941                let backfill_state = committed_reader
942                    .try_recover_from_state_store(&split_id)
943                    .await?;
944                match backfill_state {
945                    None => {
946                        // Newly added split. We don't need to backfill.
947                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
948                        target_state.insert(
949                            split_id,
950                            BackfillStateWithProgress {
951                                state: BackfillState::Finished,
952                                num_consumed_rows: 0,
953                                target_offset: None,
954                            },
955                        );
956                    }
957                    Some(backfill_state) => {
958                        // Migrated split. Backfill if unfinished.
959                        target_state.insert(split_id, backfill_state);
960                    }
961                }
962            }
963        }
964
965        // Checks dropped splits
966        for existing_split_id in old_states.keys() {
967            if !target_state.contains_key(existing_split_id) {
968                tracing::info!("split dropping detected: {}", existing_split_id);
969                split_changed = true;
970            }
971        }
972
973        if split_changed {
974            let dropped_splits = stage
975                .states
976                .extract_if(|split_id, _| !target_state.contains_key(split_id))
977                .map(|(split_id, _)| split_id);
978
979            if should_trim_state {
980                // trim dropped splits' state
981                self.backfill_state_store.trim_state(dropped_splits).await?;
982            }
983            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
984        } else {
985            debug_assert_eq!(old_states, target_state);
986        }
987        stage.states = target_state;
988        stage.splits = target_splits;
989        stage.debug_assert_consistent();
990        Ok(split_changed)
991    }
992
993    /// For split change during forward stage, all newly added splits should be already finished.
994    // We just need to update the state store if necessary.
995    async fn apply_split_change_forward_stage_after_yield_barrier(
996        &mut self,
997        barrier_epoch: EpochPair,
998        target_splits: Vec<SplitImpl>,
999        states: &mut BackfillStates,
1000        should_trim_state: bool,
1001    ) -> StreamExecutorResult<()> {
1002        self.source_split_change_count.inc();
1003        {
1004            self.update_state_if_changed_forward_stage(
1005                barrier_epoch,
1006                target_splits,
1007                states,
1008                should_trim_state,
1009            )
1010            .await?;
1011        }
1012
1013        Ok(())
1014    }
1015
1016    async fn update_state_if_changed_forward_stage(
1017        &mut self,
1018        barrier_epoch: EpochPair,
1019        target_splits: Vec<SplitImpl>,
1020        states: &mut BackfillStates,
1021        should_trim_state: bool,
1022    ) -> StreamExecutorResult<()> {
1023        let target_splits: HashSet<SplitId> = target_splits
1024            .into_iter()
1025            .map(|split| (split.id()))
1026            .collect();
1027
1028        let mut split_changed = false;
1029        let mut newly_added_splits = vec![];
1030
1031        let committed_reader = self
1032            .backfill_state_store
1033            .new_committed_reader(barrier_epoch)
1034            .await?;
1035
1036        // Checks added splits
1037        for split_id in &target_splits {
1038            if !states.contains_key(split_id) {
1039                split_changed = true;
1040
1041                let backfill_state = committed_reader
1042                    .try_recover_from_state_store(split_id)
1043                    .await?;
1044                match &backfill_state {
1045                    None => {
1046                        // Newly added split. We don't need to backfill!
1047                        newly_added_splits.push(split_id.clone());
1048                    }
1049                    Some(backfill_state) => {
1050                        // Migrated split. It should also be finished since we are in forwarding stage.
1051                        match backfill_state.state {
1052                            BackfillState::Finished => {}
1053                            _ => {
1054                                return Err(anyhow::anyhow!(
1055                                    "Unexpected backfill state: {:?}",
1056                                    backfill_state
1057                                )
1058                                .into());
1059                            }
1060                        }
1061                    }
1062                }
1063                states.insert(
1064                    split_id.clone(),
1065                    backfill_state.unwrap_or(BackfillStateWithProgress {
1066                        state: BackfillState::Finished,
1067                        num_consumed_rows: 0,
1068                        target_offset: None,
1069                    }),
1070                );
1071            }
1072        }
1073
1074        // Checks dropped splits
1075        for existing_split_id in states.keys() {
1076            if !target_splits.contains(existing_split_id) {
1077                tracing::info!("split dropping detected: {}", existing_split_id);
1078                split_changed = true;
1079            }
1080        }
1081
1082        if split_changed {
1083            tracing::info!(
1084                target_splits = ?target_splits,
1085                "apply split change"
1086            );
1087
1088            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1089
1090            if should_trim_state {
1091                // trim dropped splits' state
1092                self.backfill_state_store
1093                    .trim_state(dropped_splits.map(|(k, _v)| k))
1094                    .await?;
1095            }
1096
1097            // For migrated splits, and existing splits, we do not need to update
1098            // state store, but only for newly added splits.
1099            self.backfill_state_store
1100                .set_states(
1101                    newly_added_splits
1102                        .into_iter()
1103                        .map(|split_id| {
1104                            (
1105                                split_id,
1106                                BackfillStateWithProgress {
1107                                    state: BackfillState::Finished,
1108                                    num_consumed_rows: 0,
1109                                    target_offset: None,
1110                                },
1111                            )
1112                        })
1113                        .collect(),
1114                )
1115                .await?;
1116        }
1117
1118        Ok(())
1119    }
1120}
1121
1122fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1123    let a = a.parse::<i64>().unwrap();
1124    let b = b.parse::<i64>().unwrap();
1125    a.cmp(&b)
1126}
1127
1128impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1129    fn execute(self: Box<Self>) -> BoxedMessageStream {
1130        self.inner.execute(self.input).boxed()
1131    }
1132}
1133
1134impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1136        f.debug_struct("SourceBackfillExecutor")
1137            .field("source_id", &self.source_id)
1138            .field("column_ids", &self.column_ids)
1139            .field("pk_indices", &self.info.pk_indices)
1140            .finish()
1141    }
1142}