risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32    BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33    SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51    /// `None` means not started yet. It's the initial state.
52    /// XXX: perhaps we can also set to low-watermark instead of `None`
53    Backfilling(Option<String>),
54    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55    SourceCachingUp(String),
56    Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64    pub state: BackfillState,
65    pub num_consumed_rows: u64,
66    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68    /// so that we can finish backfilling even when upstream doesn't emit any data.
69    pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73    pub fn encode_to_json(self) -> JsonbVal {
74        serde_json::to_value(self).unwrap().into()
75    }
76
77    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79    }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83    pub inner: SourceBackfillExecutorInner<S>,
84    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85    pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89    actor_ctx: ActorContextRef,
90    info: ExecutorInfo,
91
92    /// Streaming source for external
93    source_id: TableId,
94    source_name: String,
95    column_ids: Vec<ColumnId>,
96    source_desc_builder: Option<SourceDescBuilder>,
97    backfill_state_store: BackfillStateTableHandler<S>,
98
99    /// Metrics for monitor.
100    metrics: Arc<StreamingMetrics>,
101    source_split_change_count: LabelGuardedIntCounter,
102
103    // /// Receiver of barrier channel.
104    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105    /// System parameter reader to read barrier interval
106    system_params: SystemParamsReaderRef,
107
108    /// Rate limit in rows/s.
109    rate_limit_rps: Option<u32>,
110
111    progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121    states: BackfillStates,
122    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123    ///
124    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125    splits: Vec<SplitImpl>,
126}
127
128enum ApplyMutationAfterBarrier {
129    SourceChangeSplit {
130        target_splits: Vec<SplitImpl>,
131        should_trim_state: bool,
132    },
133    ConnectorPropsChange,
134}
135
136impl BackfillStage {
137    fn total_backfilled_rows(&self) -> u64 {
138        self.states.values().map(|s| s.num_consumed_rows).sum()
139    }
140
141    fn debug_assert_consistent(&self) {
142        if cfg!(debug_assertions) {
143            let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
144            assert_eq!(
145                self.states.keys().cloned().collect::<HashSet<_>>(),
146                all_splits
147            );
148        }
149    }
150
151    /// Get unfinished splits with latest offsets according to the backfill states.
152    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
153        let mut unfinished_splits = Vec::new();
154        for split in &self.splits {
155            let state = &self.states.get(split.id().as_ref()).unwrap().state;
156            match state {
157                BackfillState::Backfilling(Some(offset)) => {
158                    let mut updated_split = split.clone();
159                    updated_split.update_in_place(offset.clone())?;
160                    unfinished_splits.push(updated_split);
161                }
162                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
163                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
164            }
165        }
166        Ok(unfinished_splits)
167    }
168
169    /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
170    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
171        let mut vis = false;
172        let state = self.states.get_mut(split_id).unwrap();
173        let state_inner = &mut state.state;
174        match state_inner {
175            BackfillState::Backfilling(None) => {
176                // backfilling for this split is not started yet. Ignore this row
177            }
178            BackfillState::Backfilling(Some(backfill_offset)) => {
179                match compare_kafka_offset(backfill_offset, offset) {
180                    Ordering::Less => {
181                        // continue backfilling. Ignore this row
182                    }
183                    Ordering::Equal => {
184                        // backfilling for this split is finished just right.
185                        *state_inner = BackfillState::Finished;
186                    }
187                    Ordering::Greater => {
188                        // backfilling for this split produced more data than current source's progress.
189                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
190                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
191                    }
192                }
193            }
194            BackfillState::SourceCachingUp(backfill_offset) => {
195                match compare_kafka_offset(backfill_offset, offset) {
196                    Ordering::Less => {
197                        // Source caught up, but doesn't contain the last backfilled row.
198                        // This may happen e.g., if Kafka performed compaction.
199                        vis = true;
200                        *state_inner = BackfillState::Finished;
201                    }
202                    Ordering::Equal => {
203                        // Source just caught up with backfilling.
204                        *state_inner = BackfillState::Finished;
205                    }
206                    Ordering::Greater => {
207                        // Source is still behind backfilling.
208                    }
209                }
210            }
211            BackfillState::Finished => {
212                vis = true;
213                // This split's backfilling is finished, we are waiting for other splits
214            }
215        }
216        if matches!(state_inner, BackfillState::Backfilling(_)) {
217            state.target_offset = Some(offset.to_owned());
218        }
219        if vis {
220            debug_assert_eq!(*state_inner, BackfillState::Finished);
221        }
222        vis
223    }
224
225    /// Updates backfill states and returns whether the row backfilled from external system is visible.
226    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
227        let state = self.states.get_mut(split_id).unwrap();
228        state.num_consumed_rows += 1;
229        let state_inner = &mut state.state;
230        match state_inner {
231            BackfillState::Backfilling(_old_offset) => {
232                if let Some(target_offset) = &state.target_offset
233                    && compare_kafka_offset(offset, target_offset).is_ge()
234                {
235                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
236                    // and dropping duplicated messages.
237                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
238                    // In this case, upstream hasn't reached the target_offset yet.
239                    //
240                    // Note2: after this, all following rows in the current chunk will be invisible.
241                    //
242                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
243                    // keep backfilling.
244                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
245                } else {
246                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
247                }
248                true
249            }
250            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
251                // backfilling stopped. ignore
252                false
253            }
254        }
255    }
256}
257
258impl<S: StateStore> SourceBackfillExecutorInner<S> {
259    #[expect(clippy::too_many_arguments)]
260    pub fn new(
261        actor_ctx: ActorContextRef,
262        info: ExecutorInfo,
263        stream_source_core: StreamSourceCore<S>,
264        metrics: Arc<StreamingMetrics>,
265        system_params: SystemParamsReaderRef,
266        backfill_state_store: BackfillStateTableHandler<S>,
267        rate_limit_rps: Option<u32>,
268        progress: CreateMviewProgressReporter,
269    ) -> Self {
270        let source_split_change_count = metrics
271            .source_split_change_count
272            .with_guarded_label_values(&[
273                &stream_source_core.source_id.to_string(),
274                &stream_source_core.source_name,
275                &actor_ctx.id.to_string(),
276                &actor_ctx.fragment_id.to_string(),
277            ]);
278
279        Self {
280            actor_ctx,
281            info,
282            source_id: stream_source_core.source_id,
283            source_name: stream_source_core.source_name,
284            column_ids: stream_source_core.column_ids,
285            source_desc_builder: stream_source_core.source_desc_builder,
286            backfill_state_store,
287            metrics,
288            source_split_change_count,
289            system_params,
290            rate_limit_rps,
291            progress,
292        }
293    }
294
295    async fn build_stream_source_reader(
296        &self,
297        source_desc: &SourceDesc,
298        splits: Vec<SplitImpl>,
299    ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
300        let column_ids = source_desc
301            .columns
302            .iter()
303            .map(|column_desc| column_desc.column_id)
304            .collect_vec();
305        let source_ctx = SourceContext::new(
306            self.actor_ctx.id,
307            self.source_id,
308            self.actor_ctx.fragment_id,
309            self.source_name.clone(),
310            source_desc.metrics.clone(),
311            SourceCtrlOpts {
312                chunk_size: limited_chunk_size(self.rate_limit_rps),
313                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
314            },
315            source_desc.source.config.clone(),
316            None,
317        );
318
319        // We will check watermark to decide whether we need to backfill.
320        // e.g., when there's a Kafka topic-partition without any data,
321        // we don't need to backfill at all. But if we do not check here,
322        // the executor can only know it's finished when data coming in.
323        // For blocking DDL, this would be annoying.
324
325        let (stream, res) = source_desc
326            .source
327            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
328            .await
329            .map_err(StreamExecutorError::connector_error)?;
330        Ok((
331            apply_rate_limit(stream, self.rate_limit_rps).boxed(),
332            res.backfill_info,
333        ))
334    }
335
336    #[try_stream(ok = Message, error = StreamExecutorError)]
337    async fn execute(mut self, input: Executor) {
338        let mut input = input.execute();
339
340        // Poll the upstream to get the first barrier.
341        let barrier = expect_first_barrier(&mut input).await?;
342        let first_epoch = barrier.epoch;
343        let owned_splits = barrier
344            .initial_split_assignment(self.actor_ctx.id)
345            .unwrap_or(&[])
346            .to_vec();
347
348        let mut pause_control = PauseControl::new();
349        if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
350            pause_control.backfill_pause();
351        }
352        if barrier.is_pause_on_startup() {
353            pause_control.command_pause();
354        }
355        yield Message::Barrier(barrier);
356
357        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
358        let mut source_desc = source_desc_builder
359            .build()
360            .map_err(StreamExecutorError::connector_error)?;
361        let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
362        else {
363            unreachable!("Partition and offset columns must be set.");
364        };
365
366        self.backfill_state_store.init_epoch(first_epoch).await?;
367
368        let mut backfill_states: BackfillStates = HashMap::new();
369        {
370            let committed_reader = self
371                .backfill_state_store
372                .new_committed_reader(first_epoch)
373                .await?;
374            for split in &owned_splits {
375                let split_id = split.id();
376                let backfill_state = committed_reader
377                    .try_recover_from_state_store(&split_id)
378                    .await?
379                    .unwrap_or(BackfillStateWithProgress {
380                        state: BackfillState::Backfilling(None),
381                        num_consumed_rows: 0,
382                        target_offset: None, // init with None
383                    });
384                backfill_states.insert(split_id, backfill_state);
385            }
386        }
387        let mut backfill_stage = BackfillStage {
388            states: backfill_states,
389            splits: owned_splits,
390        };
391        backfill_stage.debug_assert_consistent();
392
393        let (source_chunk_reader, backfill_info) = self
394            .build_stream_source_reader(
395                &source_desc,
396                backfill_stage.get_latest_unfinished_splits()?,
397            )
398            .instrument_await("source_build_reader")
399            .await?;
400        for (split_id, info) in &backfill_info {
401            let state = backfill_stage.states.get_mut(split_id).unwrap();
402            match info {
403                BackfillInfo::NoDataToBackfill => {
404                    state.state = BackfillState::Finished;
405                }
406                BackfillInfo::HasDataToBackfill { latest_offset } => {
407                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
408                    state.target_offset = Some(latest_offset.clone());
409                }
410            }
411        }
412        tracing::debug!(?backfill_stage, "source backfill started");
413
414        fn select_strategy(_: &mut ()) -> PollNext {
415            futures::stream::PollNext::Left
416        }
417
418        // We choose "preferring upstream" strategy here, because:
419        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
420        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
421        //   So possibly this can also affect other running jobs less.
422        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
423        let mut backfill_stream = select_with_strategy(
424            input.by_ref().map(Either::Left),
425            source_chunk_reader.map(Either::Right),
426            select_strategy,
427        );
428
429        type PausedReader = Option<impl Stream>;
430        let mut paused_reader: PausedReader = None;
431
432        macro_rules! pause_reader {
433            () => {
434                if !pause_control.reader_paused {
435                    let (left, right) = backfill_stream.into_inner();
436                    backfill_stream = select_with_strategy(
437                        left,
438                        futures::stream::pending().boxed().map(Either::Right),
439                        select_strategy,
440                    );
441                    // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
442                    paused_reader = Some(right);
443                    pause_control.reader_paused = true;
444                }
445            };
446        }
447
448        macro_rules! resume_reader {
449            () => {
450                if pause_control.reader_paused {
451                    backfill_stream = select_with_strategy(
452                        input.by_ref().map(Either::Left),
453                        paused_reader
454                            .take()
455                            .expect("should have paused reader to resume"),
456                        select_strategy,
457                    );
458                    pause_control.reader_paused = false;
459                }
460            };
461        }
462
463        if pause_control.is_paused() {
464            pause_reader!();
465        }
466
467        let state_store = self
468            .backfill_state_store
469            .state_store()
470            .state_store()
471            .clone();
472        let table_id = self.backfill_state_store.state_store().table_id();
473        let mut state_table_initialized = false;
474        {
475            let source_backfill_row_count = self
476                .metrics
477                .source_backfill_row_count
478                .with_guarded_label_values(&[
479                    &self.source_id.to_string(),
480                    &self.source_name,
481                    &self.actor_ctx.id.to_string(),
482                    &self.actor_ctx.fragment_id.to_string(),
483                ]);
484
485            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
486            // milliseconds, considering some other latencies like network and cost in Meta.
487            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
488                as u128
489                * WAIT_BARRIER_MULTIPLE_TIMES;
490            let mut last_barrier_time = Instant::now();
491
492            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
493            'backfill_loop: while let Some(either) = backfill_stream.next().await {
494                match either {
495                    // Upstream
496                    Either::Left(msg) => {
497                        let Ok(msg) = msg else {
498                            let e = msg.unwrap_err();
499                            tracing::error!(
500                                error = ?e.as_report(),
501                                source_id = %self.source_id,
502                                "stream source reader error",
503                            );
504                            GLOBAL_ERROR_METRICS.user_source_error.report([
505                                "SourceReaderError".to_owned(),
506                                self.source_id.to_string(),
507                                self.source_name.clone(),
508                                self.actor_ctx.fragment_id.to_string(),
509                            ]);
510
511                            let (reader, _backfill_info) = self
512                                .build_stream_source_reader(
513                                    &source_desc,
514                                    backfill_stage.get_latest_unfinished_splits()?,
515                                )
516                                .await?;
517
518                            backfill_stream = select_with_strategy(
519                                input.by_ref().map(Either::Left),
520                                reader.map(Either::Right),
521                                select_strategy,
522                            );
523                            continue;
524                        };
525                        match msg {
526                            Message::Barrier(barrier) => {
527                                last_barrier_time = Instant::now();
528
529                                if pause_control.self_resume() {
530                                    resume_reader!();
531                                }
532
533                                let mut maybe_muatation = None;
534                                if let Some(ref mutation) = barrier.mutation.as_deref() {
535                                    match mutation {
536                                        Mutation::Pause => {
537                                            // pause_reader should not be invoked consecutively more than once.
538                                            pause_control.command_pause();
539                                            pause_reader!();
540                                        }
541                                        Mutation::Resume => {
542                                            // pause_reader.take should not be invoked consecutively more than once.
543                                            if pause_control.command_resume() {
544                                                resume_reader!();
545                                            }
546                                        }
547                                        Mutation::StartFragmentBackfill { fragment_ids } => {
548                                            if fragment_ids.contains(&self.actor_ctx.fragment_id)
549                                                && pause_control.backfill_resume()
550                                            {
551                                                resume_reader!();
552                                            }
553                                        }
554                                        Mutation::SourceChangeSplit(actor_splits) => {
555                                            tracing::info!(
556                                                actor_splits = ?actor_splits,
557                                                "source change split received"
558                                            );
559                                            maybe_muatation = actor_splits
560                                                .get(&self.actor_ctx.id)
561                                                .cloned()
562                                                .map(|target_splits| {
563                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
564                                                        target_splits,
565                                                        should_trim_state: true,
566                                                    }
567                                                });
568                                        }
569                                        Mutation::Update(UpdateMutation {
570                                            actor_splits, ..
571                                        }) => {
572                                            maybe_muatation = actor_splits
573                                                .get(&self.actor_ctx.id)
574                                                .cloned()
575                                                .map(|target_splits| {
576                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
577                                                        target_splits,
578                                                        should_trim_state: false,
579                                                    }
580                                                });
581                                        }
582                                        Mutation::ConnectorPropsChange(maybe_mutation) => {
583                                            if let Some(props_plaintext) =
584                                                maybe_mutation.get(&self.source_id.as_raw_id())
585                                            {
586                                                source_desc
587                                                    .update_reader(props_plaintext.clone())?;
588
589                                                maybe_muatation = Some(
590                                                    ApplyMutationAfterBarrier::ConnectorPropsChange,
591                                                );
592                                            }
593                                        }
594                                        Mutation::Throttle(actor_to_apply) => {
595                                            if let Some(new_rate_limit) =
596                                                actor_to_apply.get(&self.actor_ctx.id)
597                                                && *new_rate_limit != self.rate_limit_rps
598                                            {
599                                                tracing::info!(
600                                                    "updating rate limit from {:?} to {:?}",
601                                                    self.rate_limit_rps,
602                                                    *new_rate_limit
603                                                );
604                                                self.rate_limit_rps = *new_rate_limit;
605                                                // rebuild reader
606                                                let (reader, _backfill_info) = self
607                                                    .build_stream_source_reader(
608                                                        &source_desc,
609                                                        backfill_stage
610                                                            .get_latest_unfinished_splits()?,
611                                                    )
612                                                    .await?;
613
614                                                backfill_stream = select_with_strategy(
615                                                    input.by_ref().map(Either::Left),
616                                                    reader.map(Either::Right),
617                                                    select_strategy,
618                                                );
619                                            }
620                                        }
621                                        _ => {}
622                                    }
623                                }
624                                async fn rebuild_reader_on_split_changed(
625                                    this: &SourceBackfillExecutorInner<impl StateStore>,
626                                    backfill_stage: &BackfillStage,
627                                    source_desc: &SourceDesc,
628                                ) -> StreamExecutorResult<BoxSourceChunkStream>
629                                {
630                                    // rebuild backfill_stream
631                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
632
633                                    let latest_unfinished_splits =
634                                        backfill_stage.get_latest_unfinished_splits()?;
635                                    tracing::info!(
636                                        "actor {:?} apply source split change to {:?}",
637                                        this.actor_ctx.id,
638                                        latest_unfinished_splits
639                                    );
640
641                                    // Replace the source reader with a new one of the new state.
642                                    let (reader, _backfill_info) = this
643                                        .build_stream_source_reader(
644                                            source_desc,
645                                            latest_unfinished_splits,
646                                        )
647                                        .await?;
648
649                                    Ok(reader)
650                                }
651
652                                self.backfill_state_store
653                                    .set_states(backfill_stage.states.clone())
654                                    .await?;
655                                self.backfill_state_store.commit(barrier.epoch).await?;
656
657                                if self.should_report_finished(&backfill_stage.states) {
658                                    // drop the backfill kafka consumers
659                                    backfill_stream = select_with_strategy(
660                                        input.by_ref().map(Either::Left),
661                                        futures::stream::pending().boxed().map(Either::Right),
662                                        select_strategy,
663                                    );
664
665                                    self.progress.finish(
666                                        barrier.epoch,
667                                        backfill_stage.total_backfilled_rows(),
668                                    );
669
670                                    let barrier_epoch = barrier.epoch;
671                                    let is_checkpoint = barrier.is_checkpoint();
672                                    // yield barrier after reporting progress
673                                    yield Message::Barrier(barrier);
674
675                                    if let Some(to_apply_mutation) = maybe_muatation {
676                                        self.apply_split_change_after_yield_barrier(
677                                            barrier_epoch,
678                                            &mut backfill_stage,
679                                            to_apply_mutation,
680                                        )
681                                        .await?;
682                                    }
683
684                                    if !state_table_initialized {
685                                        if is_checkpoint {
686                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
687                                            // We wait for 2nd epoch
688                                            let epoch = barrier_epoch.prev;
689                                            tracing::info!("waiting for epoch: {}", epoch);
690                                            state_store
691                                                .try_wait_epoch(
692                                                    HummockReadEpoch::Committed(epoch),
693                                                    TryWaitEpochOptions { table_id },
694                                                )
695                                                .await?;
696                                            tracing::info!("finished waiting for epoch: {}", epoch);
697                                            state_table_initialized = true;
698                                        }
699                                    } else {
700                                        // After we reported finished, we still don't exit the loop.
701                                        // Because we need to handle split migration.
702                                        assert!(
703                                            state_table_initialized,
704                                            "state table should be initialized before checking backfill finished"
705                                        );
706                                        if self.backfill_finished(&backfill_stage.states).await? {
707                                            tracing::info!("source backfill finished");
708                                            break 'backfill_loop;
709                                        }
710                                    }
711                                } else {
712                                    self.progress.update_for_source_backfill(
713                                        barrier.epoch,
714                                        backfill_stage.total_backfilled_rows(),
715                                    );
716
717                                    let barrier_epoch = barrier.epoch;
718                                    // yield barrier after reporting progress
719                                    yield Message::Barrier(barrier);
720
721                                    if let Some(to_apply_mutation) = maybe_muatation
722                                        && self
723                                            .apply_split_change_after_yield_barrier(
724                                                barrier_epoch,
725                                                &mut backfill_stage,
726                                                to_apply_mutation,
727                                            )
728                                            .await?
729                                    {
730                                        let reader = rebuild_reader_on_split_changed(
731                                            &self,
732                                            &backfill_stage,
733                                            &source_desc,
734                                        )
735                                        .await?;
736
737                                        backfill_stream = select_with_strategy(
738                                            input.by_ref().map(Either::Left),
739                                            reader.map(Either::Right),
740                                            select_strategy,
741                                        );
742                                    }
743                                }
744                            }
745                            Message::Chunk(chunk) => {
746                                // We need to iterate over all rows because there might be multiple splits in a chunk.
747                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
748                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
749
750                                for (i, (_, row)) in chunk.rows().enumerate() {
751                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
752                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
753                                    let vis = backfill_stage.handle_upstream_row(split, offset);
754                                    new_vis.set(i, vis);
755                                }
756                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
757                                let new_vis = new_vis.finish();
758                                if new_vis.count_ones() != 0 {
759                                    let new_chunk = chunk.clone_with_vis(new_vis);
760                                    yield Message::Chunk(new_chunk);
761                                }
762                            }
763                            Message::Watermark(_) => {
764                                // Ignore watermark during backfill.
765                            }
766                        }
767                    }
768                    // backfill
769                    Either::Right(msg) => {
770                        let chunk = msg?;
771
772                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
773                            // Pause to let barrier catch up via backpressure of snapshot stream.
774                            pause_control.self_pause();
775                            pause_reader!();
776
777                            // Exceeds the max wait barrier time, the source will be paused.
778                            // Currently we can guarantee the
779                            // source is not paused since it received stream
780                            // chunks.
781                            tracing::warn!(
782                                "source {} paused, wait barrier for {:?}",
783                                self.info.identity,
784                                last_barrier_time.elapsed()
785                            );
786
787                            // Only update `max_wait_barrier_time_ms` to capture
788                            // `barrier_interval_ms`
789                            // changes here to avoid frequently accessing the shared
790                            // `system_params`.
791                            max_wait_barrier_time_ms =
792                                self.system_params.load().barrier_interval_ms() as u128
793                                    * WAIT_BARRIER_MULTIPLE_TIMES;
794                        }
795                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
796
797                        for (i, (_, row)) in chunk.rows().enumerate() {
798                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
799                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
800                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
801                            new_vis.set(i, vis);
802                        }
803
804                        let new_vis = new_vis.finish();
805                        let card = new_vis.count_ones();
806                        if card != 0 {
807                            let new_chunk = chunk.clone_with_vis(new_vis);
808                            yield Message::Chunk(new_chunk);
809                            source_backfill_row_count.inc_by(card as u64);
810                        }
811                    }
812                }
813            }
814        }
815
816        std::mem::drop(backfill_stream);
817        let mut states = backfill_stage.states;
818        // Make sure `Finished` state is persisted.
819        self.backfill_state_store.set_states(states.clone()).await?;
820
821        // All splits finished backfilling. Now we only forward the source data.
822        #[for_await]
823        for msg in input {
824            let msg = msg?;
825            match msg {
826                Message::Barrier(barrier) => {
827                    let mut split_changed = None;
828                    if let Some(ref mutation) = barrier.mutation.as_deref() {
829                        match mutation {
830                            Mutation::Pause | Mutation::Resume => {
831                                // We don't need to do anything. Handled by upstream.
832                            }
833                            Mutation::SourceChangeSplit(actor_splits) => {
834                                tracing::info!(
835                                    actor_splits = ?actor_splits,
836                                    "source change split received"
837                                );
838                                split_changed = actor_splits
839                                    .get(&self.actor_ctx.id)
840                                    .cloned()
841                                    .map(|target_splits| (target_splits, &mut states, true));
842                            }
843                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
844                                split_changed = actor_splits
845                                    .get(&self.actor_ctx.id)
846                                    .cloned()
847                                    .map(|target_splits| (target_splits, &mut states, false));
848                            }
849                            _ => {}
850                        }
851                    }
852                    self.backfill_state_store.commit(barrier.epoch).await?;
853                    let barrier_epoch = barrier.epoch;
854                    yield Message::Barrier(barrier);
855
856                    if let Some((target_splits, state, should_trim_state)) = split_changed {
857                        self.apply_split_change_forward_stage_after_yield_barrier(
858                            barrier_epoch,
859                            target_splits,
860                            state,
861                            should_trim_state,
862                        )
863                        .await?;
864                    }
865                }
866                Message::Chunk(chunk) => {
867                    yield Message::Chunk(chunk);
868                }
869                Message::Watermark(watermark) => {
870                    yield Message::Watermark(watermark);
871                }
872            }
873        }
874    }
875
876    /// When we should call `progress.finish()` to let blocking DDL return.
877    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
878    ///
879    /// Note: split migration (online scaling) is related with progress tracking.
880    /// - For foreground DDL, scaling is not allowed before progress is finished.
881    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
882    ///
883    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
884    fn should_report_finished(&self, states: &BackfillStates) -> bool {
885        states.values().all(|state| {
886            matches!(
887                state.state,
888                BackfillState::Finished | BackfillState::SourceCachingUp(_)
889            )
890        })
891    }
892
893    /// All splits entered `Finished` state.
894    ///
895    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
896    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
897    /// this actor, we still need to backfill it.
898    ///
899    /// Note: at the beginning, the actor will only read the state written by itself.
900    /// It needs to _wait until it can read all actors' written data_.
901    /// i.e., wait for the second checkpoint has been available.
902    ///
903    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
904    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
905        Ok(states
906            .values()
907            .all(|state| matches!(state.state, BackfillState::Finished))
908            && self
909                .backfill_state_store
910                .scan_may_stale()
911                .await?
912                .into_iter()
913                .all(|state| matches!(state.state, BackfillState::Finished)))
914    }
915
916    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
917    async fn apply_split_change_after_yield_barrier(
918        &mut self,
919        barrier_epoch: EpochPair,
920        stage: &mut BackfillStage,
921        to_apply_mutation: ApplyMutationAfterBarrier,
922    ) -> StreamExecutorResult<bool> {
923        match to_apply_mutation {
924            ApplyMutationAfterBarrier::SourceChangeSplit {
925                target_splits,
926                should_trim_state,
927            } => {
928                self.source_split_change_count.inc();
929                {
930                    if self
931                        .update_state_if_changed(
932                            barrier_epoch,
933                            target_splits,
934                            stage,
935                            should_trim_state,
936                        )
937                        .await?
938                    {
939                        // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
940                        Ok(true)
941                    } else {
942                        Ok(false)
943                    }
944                }
945            }
946            ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
947        }
948    }
949
950    /// Returns `true` if split changed. Otherwise `false`.
951    async fn update_state_if_changed(
952        &mut self,
953        barrier_epoch: EpochPair,
954        target_splits: Vec<SplitImpl>,
955        stage: &mut BackfillStage,
956        should_trim_state: bool,
957    ) -> StreamExecutorResult<bool> {
958        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
959
960        let mut split_changed = false;
961        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
962        // Will be set to target_state in the end.
963        let old_states = std::mem::take(&mut stage.states);
964        let committed_reader = self
965            .backfill_state_store
966            .new_committed_reader(barrier_epoch)
967            .await?;
968        // Iterate over the target (assigned) splits
969        // - check if any new splits are added
970        // - build target_state
971        for split in &target_splits {
972            let split_id = split.id();
973            if let Some(s) = old_states.get(&split_id) {
974                target_state.insert(split_id, s.clone());
975            } else {
976                split_changed = true;
977
978                let backfill_state = committed_reader
979                    .try_recover_from_state_store(&split_id)
980                    .await?;
981                match backfill_state {
982                    None => {
983                        // Newly added split. We don't need to backfill.
984                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
985                        target_state.insert(
986                            split_id,
987                            BackfillStateWithProgress {
988                                state: BackfillState::Finished,
989                                num_consumed_rows: 0,
990                                target_offset: None,
991                            },
992                        );
993                    }
994                    Some(backfill_state) => {
995                        // Migrated split. Backfill if unfinished.
996                        target_state.insert(split_id, backfill_state);
997                    }
998                }
999            }
1000        }
1001
1002        // Checks dropped splits
1003        for existing_split_id in old_states.keys() {
1004            if !target_state.contains_key(existing_split_id) {
1005                tracing::info!("split dropping detected: {}", existing_split_id);
1006                split_changed = true;
1007            }
1008        }
1009
1010        if split_changed {
1011            let dropped_splits = stage
1012                .states
1013                .extract_if(|split_id, _| !target_state.contains_key(split_id))
1014                .map(|(split_id, _)| split_id);
1015
1016            if should_trim_state {
1017                // trim dropped splits' state
1018                self.backfill_state_store.trim_state(dropped_splits).await?;
1019            }
1020            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1021        } else {
1022            debug_assert_eq!(old_states, target_state);
1023        }
1024        stage.states = target_state;
1025        stage.splits = target_splits;
1026        stage.debug_assert_consistent();
1027        Ok(split_changed)
1028    }
1029
1030    /// For split change during forward stage, all newly added splits should be already finished.
1031    // We just need to update the state store if necessary.
1032    async fn apply_split_change_forward_stage_after_yield_barrier(
1033        &mut self,
1034        barrier_epoch: EpochPair,
1035        target_splits: Vec<SplitImpl>,
1036        states: &mut BackfillStates,
1037        should_trim_state: bool,
1038    ) -> StreamExecutorResult<()> {
1039        self.source_split_change_count.inc();
1040        {
1041            self.update_state_if_changed_forward_stage(
1042                barrier_epoch,
1043                target_splits,
1044                states,
1045                should_trim_state,
1046            )
1047            .await?;
1048        }
1049
1050        Ok(())
1051    }
1052
1053    async fn update_state_if_changed_forward_stage(
1054        &mut self,
1055        barrier_epoch: EpochPair,
1056        target_splits: Vec<SplitImpl>,
1057        states: &mut BackfillStates,
1058        should_trim_state: bool,
1059    ) -> StreamExecutorResult<()> {
1060        let target_splits: HashSet<SplitId> =
1061            target_splits.into_iter().map(|split| split.id()).collect();
1062
1063        let mut split_changed = false;
1064        let mut newly_added_splits = vec![];
1065
1066        let committed_reader = self
1067            .backfill_state_store
1068            .new_committed_reader(barrier_epoch)
1069            .await?;
1070
1071        // Checks added splits
1072        for split_id in &target_splits {
1073            if !states.contains_key(split_id) {
1074                split_changed = true;
1075
1076                let backfill_state = committed_reader
1077                    .try_recover_from_state_store(split_id)
1078                    .await?;
1079                match &backfill_state {
1080                    None => {
1081                        // Newly added split. We don't need to backfill!
1082                        newly_added_splits.push(split_id.clone());
1083                    }
1084                    Some(backfill_state) => {
1085                        // Migrated split. It should also be finished since we are in forwarding stage.
1086                        match backfill_state.state {
1087                            BackfillState::Finished => {}
1088                            _ => {
1089                                return Err(anyhow::anyhow!(
1090                                    "Unexpected backfill state: {:?}",
1091                                    backfill_state
1092                                )
1093                                .into());
1094                            }
1095                        }
1096                    }
1097                }
1098                states.insert(
1099                    split_id.clone(),
1100                    backfill_state.unwrap_or(BackfillStateWithProgress {
1101                        state: BackfillState::Finished,
1102                        num_consumed_rows: 0,
1103                        target_offset: None,
1104                    }),
1105                );
1106            }
1107        }
1108
1109        // Checks dropped splits
1110        for existing_split_id in states.keys() {
1111            if !target_splits.contains(existing_split_id) {
1112                tracing::info!("split dropping detected: {}", existing_split_id);
1113                split_changed = true;
1114            }
1115        }
1116
1117        if split_changed {
1118            tracing::info!(
1119                target_splits = ?target_splits,
1120                "apply split change"
1121            );
1122
1123            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1124
1125            if should_trim_state {
1126                // trim dropped splits' state
1127                self.backfill_state_store
1128                    .trim_state(dropped_splits.map(|(k, _v)| k))
1129                    .await?;
1130            }
1131
1132            // For migrated splits, and existing splits, we do not need to update
1133            // state store, but only for newly added splits.
1134            self.backfill_state_store
1135                .set_states(
1136                    newly_added_splits
1137                        .into_iter()
1138                        .map(|split_id| {
1139                            (
1140                                split_id,
1141                                BackfillStateWithProgress {
1142                                    state: BackfillState::Finished,
1143                                    num_consumed_rows: 0,
1144                                    target_offset: None,
1145                                },
1146                            )
1147                        })
1148                        .collect(),
1149                )
1150                .await?;
1151        }
1152
1153        Ok(())
1154    }
1155}
1156
1157fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1158    let a = a.parse::<i64>().unwrap();
1159    let b = b.parse::<i64>().unwrap();
1160    a.cmp(&b)
1161}
1162
1163impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1164    fn execute(self: Box<Self>) -> BoxedMessageStream {
1165        self.inner.execute(self.input).boxed()
1166    }
1167}
1168
1169impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1170    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1171        f.debug_struct("SourceBackfillExecutor")
1172            .field("source_id", &self.source_id)
1173            .field("column_ids", &self.column_ids)
1174            .field("stream_key", &self.info.stream_key)
1175            .finish()
1176    }
1177}
1178
1179struct PauseControl {
1180    // Paused due to backfill order control
1181    backfill_paused: bool,
1182    // Paused due to self-pause, e.g. let barrier catch up
1183    self_paused: bool,
1184    // Paused due to Pause command from meta, pause_on_next_bootstrap
1185    command_paused: bool,
1186    // reader paused
1187    reader_paused: bool,
1188}
1189
1190impl PauseControl {
1191    fn new() -> Self {
1192        Self {
1193            backfill_paused: false,
1194            self_paused: false,
1195            command_paused: false,
1196            reader_paused: false,
1197        }
1198    }
1199
1200    fn is_paused(&self) -> bool {
1201        self.backfill_paused || self.command_paused || self.self_paused
1202    }
1203
1204    /// returns whether we need to pause the reader.
1205    fn backfill_pause(&mut self) {
1206        if self.backfill_paused {
1207            tracing::warn!("backfill_pause invoked twice");
1208        }
1209        self.backfill_paused = true;
1210    }
1211
1212    /// returns whether we need to resume the reader.
1213    /// same precedence as command.
1214    fn backfill_resume(&mut self) -> bool {
1215        if !self.backfill_paused {
1216            tracing::warn!("backfill_resume invoked twice");
1217        }
1218        !self.command_paused
1219    }
1220
1221    /// returns whether we need to pause the reader.
1222    fn self_pause(&mut self) {
1223        assert!(
1224            !self.backfill_paused,
1225            "backfill stream should not be read when backfill_pause is set"
1226        );
1227        assert!(
1228            !self.command_paused,
1229            "backfill stream should not be read when command_pause is set"
1230        );
1231        if self.self_paused {
1232            tracing::warn!("self_pause invoked twice");
1233        }
1234        self.self_paused = true;
1235    }
1236
1237    /// returns whether we need to resume the reader.
1238    /// `self_resume` has the lowest precedence,
1239    /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1240    fn self_resume(&mut self) -> bool {
1241        self.self_paused = false;
1242        !(self.backfill_paused || self.command_paused)
1243    }
1244
1245    /// returns whether we need to pause the reader.
1246    fn command_pause(&mut self) {
1247        if self.command_paused {
1248            tracing::warn!("command_pause invoked twice");
1249        }
1250        self.command_paused = true;
1251    }
1252
1253    /// returns whether we need to resume the reader.
1254    /// same precedence as backfill.
1255    fn command_resume(&mut self) -> bool {
1256        if !self.command_paused {
1257            tracing::warn!("command_resume invoked twice");
1258        }
1259        self.command_paused = false;
1260        !self.backfill_paused
1261    }
1262}