risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::ColumnId;
25use risingwave_common::id::SourceId;
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
27use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::JsonbVal;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::{
33    BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
34    SplitMetaData,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::common::ThrottleType;
38use risingwave_storage::store::TryWaitEpochOptions;
39use serde::{Deserialize, Serialize};
40use thiserror_ext::AsReport;
41
42use super::executor_core::StreamSourceCore;
43use super::source_backfill_state_table::BackfillStateTableHandler;
44use super::{apply_rate_limit, get_split_offset_col_idx};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
49use crate::task::CreateMviewProgressReporter;
50
51#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
52pub enum BackfillState {
53    /// `None` means not started yet. It's the initial state.
54    /// XXX: perhaps we can also set to low-watermark instead of `None`
55    Backfilling(Option<String>),
56    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
57    SourceCachingUp(String),
58    Finished,
59}
60pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
61
62/// Only `state` field is the real state for fail-over.
63/// Other fields are for observability (but we still need to persist them).
64#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
65pub struct BackfillStateWithProgress {
66    pub state: BackfillState,
67    pub num_consumed_rows: u64,
68    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
69    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
70    /// so that we can finish backfilling even when upstream doesn't emit any data.
71    pub target_offset: Option<String>,
72}
73
74impl BackfillStateWithProgress {
75    pub fn encode_to_json(self) -> JsonbVal {
76        serde_json::to_value(self).unwrap().into()
77    }
78
79    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
80        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
81    }
82}
83
84pub struct SourceBackfillExecutor<S: StateStore> {
85    pub inner: SourceBackfillExecutorInner<S>,
86    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
87    pub input: Executor,
88}
89
90pub struct SourceBackfillExecutorInner<S: StateStore> {
91    actor_ctx: ActorContextRef,
92    info: ExecutorInfo,
93
94    /// Streaming source for external
95    source_id: SourceId,
96    source_name: String,
97    column_ids: Vec<ColumnId>,
98    source_desc_builder: Option<SourceDescBuilder>,
99    backfill_state_store: BackfillStateTableHandler<S>,
100
101    /// Metrics for monitor.
102    metrics: Arc<StreamingMetrics>,
103    source_split_change_count: LabelGuardedIntCounter,
104
105    // /// Receiver of barrier channel.
106    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
107    /// System parameter reader to read barrier interval
108    system_params: SystemParamsReaderRef,
109
110    /// Rate limit in rows/s.
111    rate_limit_rps: Option<u32>,
112
113    progress: CreateMviewProgressReporter,
114}
115
116/// Local variables used in the backfill stage.
117///
118/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
119///
120/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
121#[derive(Debug)]
122struct BackfillStage {
123    states: BackfillStates,
124    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
125    ///
126    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
127    splits: Vec<SplitImpl>,
128}
129
130enum ApplyMutationAfterBarrier {
131    SourceChangeSplit {
132        target_splits: Vec<SplitImpl>,
133        should_trim_state: bool,
134    },
135    ConnectorPropsChange,
136}
137
138impl BackfillStage {
139    fn total_backfilled_rows(&self) -> u64 {
140        self.states.values().map(|s| s.num_consumed_rows).sum()
141    }
142
143    fn debug_assert_consistent(&self) {
144        if cfg!(debug_assertions) {
145            let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
146            assert_eq!(
147                self.states.keys().cloned().collect::<HashSet<_>>(),
148                all_splits
149            );
150        }
151    }
152
153    /// Get unfinished splits with latest offsets according to the backfill states.
154    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
155        let mut unfinished_splits = Vec::new();
156        for split in &self.splits {
157            let state = &self.states.get(split.id().as_ref()).unwrap().state;
158            match state {
159                BackfillState::Backfilling(Some(offset)) => {
160                    let mut updated_split = split.clone();
161                    updated_split.update_in_place(offset.clone())?;
162                    unfinished_splits.push(updated_split);
163                }
164                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
165                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
166            }
167        }
168        Ok(unfinished_splits)
169    }
170
171    /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
172    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
173        let mut vis = false;
174        let state = self.states.get_mut(split_id).unwrap();
175        let state_inner = &mut state.state;
176        match state_inner {
177            BackfillState::Backfilling(None) => {
178                // backfilling for this split is not started yet. Ignore this row
179            }
180            BackfillState::Backfilling(Some(backfill_offset)) => {
181                match compare_kafka_offset(backfill_offset, offset) {
182                    Ordering::Less => {
183                        // continue backfilling. Ignore this row
184                    }
185                    Ordering::Equal => {
186                        // backfilling for this split is finished just right.
187                        *state_inner = BackfillState::Finished;
188                    }
189                    Ordering::Greater => {
190                        // backfilling for this split produced more data than current source's progress.
191                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
192                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
193                    }
194                }
195            }
196            BackfillState::SourceCachingUp(backfill_offset) => {
197                match compare_kafka_offset(backfill_offset, offset) {
198                    Ordering::Less => {
199                        // Source caught up, but doesn't contain the last backfilled row.
200                        // This may happen e.g., if Kafka performed compaction.
201                        vis = true;
202                        *state_inner = BackfillState::Finished;
203                    }
204                    Ordering::Equal => {
205                        // Source just caught up with backfilling.
206                        *state_inner = BackfillState::Finished;
207                    }
208                    Ordering::Greater => {
209                        // Source is still behind backfilling.
210                    }
211                }
212            }
213            BackfillState::Finished => {
214                vis = true;
215                // This split's backfilling is finished, we are waiting for other splits
216            }
217        }
218        if matches!(state_inner, BackfillState::Backfilling(_)) {
219            state.target_offset = Some(offset.to_owned());
220        }
221        if vis {
222            debug_assert_eq!(*state_inner, BackfillState::Finished);
223        }
224        vis
225    }
226
227    /// Updates backfill states and returns whether the row backfilled from external system is visible.
228    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
229        let state = self.states.get_mut(split_id).unwrap();
230        state.num_consumed_rows += 1;
231        let state_inner = &mut state.state;
232        match state_inner {
233            BackfillState::Backfilling(_old_offset) => {
234                if let Some(target_offset) = &state.target_offset
235                    && compare_kafka_offset(offset, target_offset).is_ge()
236                {
237                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
238                    // and dropping duplicated messages.
239                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
240                    // In this case, upstream hasn't reached the target_offset yet.
241                    //
242                    // Note2: after this, all following rows in the current chunk will be invisible.
243                    //
244                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
245                    // keep backfilling.
246                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
247                } else {
248                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
249                }
250                true
251            }
252            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
253                // backfilling stopped. ignore
254                false
255            }
256        }
257    }
258}
259
260impl<S: StateStore> SourceBackfillExecutorInner<S> {
261    #[expect(clippy::too_many_arguments)]
262    pub fn new(
263        actor_ctx: ActorContextRef,
264        info: ExecutorInfo,
265        stream_source_core: StreamSourceCore<S>,
266        metrics: Arc<StreamingMetrics>,
267        system_params: SystemParamsReaderRef,
268        backfill_state_store: BackfillStateTableHandler<S>,
269        rate_limit_rps: Option<u32>,
270        progress: CreateMviewProgressReporter,
271    ) -> Self {
272        let source_split_change_count = metrics
273            .source_split_change_count
274            .with_guarded_label_values(&[
275                &stream_source_core.source_id.to_string(),
276                &stream_source_core.source_name,
277                &actor_ctx.id.to_string(),
278                &actor_ctx.fragment_id.to_string(),
279            ]);
280
281        Self {
282            actor_ctx,
283            info,
284            source_id: stream_source_core.source_id,
285            source_name: stream_source_core.source_name,
286            column_ids: stream_source_core.column_ids,
287            source_desc_builder: stream_source_core.source_desc_builder,
288            backfill_state_store,
289            metrics,
290            source_split_change_count,
291            system_params,
292            rate_limit_rps,
293            progress,
294        }
295    }
296
297    async fn build_stream_source_reader(
298        &self,
299        source_desc: &SourceDesc,
300        splits: Vec<SplitImpl>,
301    ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
302        let column_ids = source_desc
303            .columns
304            .iter()
305            .map(|column_desc| column_desc.column_id)
306            .collect_vec();
307        let source_ctx = SourceContext::new(
308            self.actor_ctx.id,
309            self.source_id,
310            self.actor_ctx.fragment_id,
311            self.source_name.clone(),
312            source_desc.metrics.clone(),
313            SourceCtrlOpts {
314                chunk_size: limited_chunk_size(self.rate_limit_rps),
315                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
316            },
317            source_desc.source.config.clone(),
318            None,
319        );
320
321        // We will check watermark to decide whether we need to backfill.
322        // e.g., when there's a Kafka topic-partition without any data,
323        // we don't need to backfill at all. But if we do not check here,
324        // the executor can only know it's finished when data coming in.
325        // For blocking DDL, this would be annoying.
326
327        let (stream, res) = source_desc
328            .source
329            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
330            .await
331            .map_err(StreamExecutorError::connector_error)?;
332        Ok((
333            apply_rate_limit(stream, self.rate_limit_rps).boxed(),
334            res.backfill_info,
335        ))
336    }
337
338    #[try_stream(ok = Message, error = StreamExecutorError)]
339    async fn execute(mut self, input: Executor) {
340        let mut input = input.execute();
341
342        // Poll the upstream to get the first barrier.
343        let barrier = expect_first_barrier(&mut input).await?;
344        let first_epoch = barrier.epoch;
345        let owned_splits = barrier
346            .initial_split_assignment(self.actor_ctx.id)
347            .unwrap_or(&[])
348            .to_vec();
349
350        let mut pause_control = PauseControl::new();
351        if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
352            pause_control.backfill_pause();
353        }
354        if barrier.is_pause_on_startup() {
355            pause_control.command_pause();
356        }
357        yield Message::Barrier(barrier);
358
359        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
360        let mut source_desc = source_desc_builder
361            .build()
362            .map_err(StreamExecutorError::connector_error)?;
363
364        // source backfill only applies to kafka, so we don't need to get pulsar's `message_id_data_idx`.
365        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
366        else {
367            unreachable!("Partition and offset columns must be set.");
368        };
369
370        self.backfill_state_store.init_epoch(first_epoch).await?;
371
372        let mut backfill_states: BackfillStates = HashMap::new();
373        {
374            let committed_reader = self
375                .backfill_state_store
376                .new_committed_reader(first_epoch)
377                .await?;
378            for split in &owned_splits {
379                let split_id = split.id();
380                let backfill_state = committed_reader
381                    .try_recover_from_state_store(&split_id)
382                    .await?
383                    .unwrap_or(BackfillStateWithProgress {
384                        state: BackfillState::Backfilling(None),
385                        num_consumed_rows: 0,
386                        target_offset: None, // init with None
387                    });
388                backfill_states.insert(split_id, backfill_state);
389            }
390        }
391        let mut backfill_stage = BackfillStage {
392            states: backfill_states,
393            splits: owned_splits,
394        };
395        backfill_stage.debug_assert_consistent();
396
397        let (source_chunk_reader, backfill_info) = self
398            .build_stream_source_reader(
399                &source_desc,
400                backfill_stage.get_latest_unfinished_splits()?,
401            )
402            .instrument_await("source_build_reader")
403            .await?;
404        for (split_id, info) in &backfill_info {
405            let state = backfill_stage.states.get_mut(split_id).unwrap();
406            match info {
407                BackfillInfo::NoDataToBackfill => {
408                    state.state = BackfillState::Finished;
409                }
410                BackfillInfo::HasDataToBackfill { latest_offset } => {
411                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
412                    state.target_offset = Some(latest_offset.clone());
413                }
414            }
415        }
416        tracing::debug!(?backfill_stage, "source backfill started");
417
418        fn select_strategy(_: &mut ()) -> PollNext {
419            futures::stream::PollNext::Left
420        }
421
422        // We choose "preferring upstream" strategy here, because:
423        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
424        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
425        //   So possibly this can also affect other running jobs less.
426        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
427        let mut backfill_stream = select_with_strategy(
428            input.by_ref().map(Either::Left),
429            source_chunk_reader.map(Either::Right),
430            select_strategy,
431        );
432
433        type PausedReader = Option<impl Stream>;
434        let mut paused_reader: PausedReader = None;
435
436        macro_rules! pause_reader {
437            () => {
438                if !pause_control.reader_paused {
439                    let (left, right) = backfill_stream.into_inner();
440                    backfill_stream = select_with_strategy(
441                        left,
442                        futures::stream::pending().boxed().map(Either::Right),
443                        select_strategy,
444                    );
445                    // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
446                    paused_reader = Some(right);
447                    pause_control.reader_paused = true;
448                }
449            };
450        }
451
452        macro_rules! resume_reader {
453            () => {
454                if pause_control.reader_paused {
455                    backfill_stream = select_with_strategy(
456                        input.by_ref().map(Either::Left),
457                        paused_reader
458                            .take()
459                            .expect("should have paused reader to resume"),
460                        select_strategy,
461                    );
462                    pause_control.reader_paused = false;
463                }
464            };
465        }
466
467        if pause_control.is_paused() {
468            pause_reader!();
469        }
470
471        let state_store = self
472            .backfill_state_store
473            .state_store()
474            .state_store()
475            .clone();
476        let table_id = self.backfill_state_store.state_store().table_id();
477        let mut state_table_initialized = false;
478        {
479            let source_backfill_row_count = self
480                .metrics
481                .source_backfill_row_count
482                .with_guarded_label_values(&[
483                    &self.source_id.to_string(),
484                    &self.source_name,
485                    &self.actor_ctx.id.to_string(),
486                    &self.actor_ctx.fragment_id.to_string(),
487                ]);
488
489            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
490            // milliseconds, considering some other latencies like network and cost in Meta.
491            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
492                as u128
493                * WAIT_BARRIER_MULTIPLE_TIMES;
494            let mut last_barrier_time = Instant::now();
495
496            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
497            'backfill_loop: while let Some(either) = backfill_stream.next().await {
498                match either {
499                    // Upstream
500                    Either::Left(msg) => {
501                        let Ok(msg) = msg else {
502                            let e = msg.unwrap_err();
503                            tracing::error!(
504                                error = ?e.as_report(),
505                                source_id = %self.source_id,
506                                "stream source reader error",
507                            );
508                            GLOBAL_ERROR_METRICS.user_source_error.report([
509                                "SourceReaderError".to_owned(),
510                                self.source_id.to_string(),
511                                self.source_name.clone(),
512                                self.actor_ctx.fragment_id.to_string(),
513                            ]);
514
515                            let (reader, _backfill_info) = self
516                                .build_stream_source_reader(
517                                    &source_desc,
518                                    backfill_stage.get_latest_unfinished_splits()?,
519                                )
520                                .await?;
521
522                            backfill_stream = select_with_strategy(
523                                input.by_ref().map(Either::Left),
524                                reader.map(Either::Right),
525                                select_strategy,
526                            );
527                            continue;
528                        };
529                        match msg {
530                            Message::Barrier(barrier) => {
531                                last_barrier_time = Instant::now();
532
533                                if pause_control.self_resume() {
534                                    resume_reader!();
535                                }
536
537                                let mut maybe_muatation = None;
538                                if let Some(ref mutation) = barrier.mutation.as_deref() {
539                                    match mutation {
540                                        Mutation::Pause => {
541                                            // pause_reader should not be invoked consecutively more than once.
542                                            pause_control.command_pause();
543                                            pause_reader!();
544                                        }
545                                        Mutation::Resume => {
546                                            // pause_reader.take should not be invoked consecutively more than once.
547                                            if pause_control.command_resume() {
548                                                resume_reader!();
549                                            }
550                                        }
551                                        Mutation::StartFragmentBackfill { fragment_ids } => {
552                                            if fragment_ids.contains(&self.actor_ctx.fragment_id)
553                                                && pause_control.backfill_resume()
554                                            {
555                                                resume_reader!();
556                                            }
557                                        }
558                                        Mutation::SourceChangeSplit(actor_splits) => {
559                                            tracing::info!(
560                                                actor_splits = ?actor_splits,
561                                                "source change split received"
562                                            );
563                                            maybe_muatation = actor_splits
564                                                .get(&self.actor_ctx.id)
565                                                .cloned()
566                                                .map(|target_splits| {
567                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
568                                                        target_splits,
569                                                        should_trim_state: true,
570                                                    }
571                                                });
572                                        }
573                                        Mutation::Update(UpdateMutation {
574                                            actor_splits, ..
575                                        }) => {
576                                            maybe_muatation = actor_splits
577                                                .get(&self.actor_ctx.id)
578                                                .cloned()
579                                                .map(|target_splits| {
580                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
581                                                        target_splits,
582                                                        should_trim_state: false,
583                                                    }
584                                                });
585                                        }
586                                        Mutation::ConnectorPropsChange(maybe_mutation) => {
587                                            if let Some(props_plaintext) =
588                                                maybe_mutation.get(&self.source_id.as_raw_id())
589                                            {
590                                                source_desc
591                                                    .update_reader(props_plaintext.clone())?;
592
593                                                maybe_muatation = Some(
594                                                    ApplyMutationAfterBarrier::ConnectorPropsChange,
595                                                );
596                                            }
597                                        }
598                                        Mutation::Throttle(fragment_to_apply) => {
599                                            if let Some(entry) =
600                                                fragment_to_apply.get(&self.actor_ctx.fragment_id)
601                                                && entry.throttle_type() == ThrottleType::Backfill
602                                                && entry.rate_limit != self.rate_limit_rps
603                                            {
604                                                tracing::info!(
605                                                    "updating rate limit from {:?} to {:?}",
606                                                    self.rate_limit_rps,
607                                                    entry.rate_limit
608                                                );
609                                                self.rate_limit_rps = entry.rate_limit;
610                                                // rebuild reader
611                                                let (reader, _backfill_info) = self
612                                                    .build_stream_source_reader(
613                                                        &source_desc,
614                                                        backfill_stage
615                                                            .get_latest_unfinished_splits()?,
616                                                    )
617                                                    .await?;
618
619                                                backfill_stream = select_with_strategy(
620                                                    input.by_ref().map(Either::Left),
621                                                    reader.map(Either::Right),
622                                                    select_strategy,
623                                                );
624                                            }
625                                        }
626                                        _ => {}
627                                    }
628                                }
629                                async fn rebuild_reader_on_split_changed(
630                                    this: &SourceBackfillExecutorInner<impl StateStore>,
631                                    backfill_stage: &BackfillStage,
632                                    source_desc: &SourceDesc,
633                                ) -> StreamExecutorResult<BoxSourceChunkStream>
634                                {
635                                    // rebuild backfill_stream
636                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
637
638                                    let latest_unfinished_splits =
639                                        backfill_stage.get_latest_unfinished_splits()?;
640                                    tracing::info!(
641                                        "actor {:?} apply source split change to {:?}",
642                                        this.actor_ctx.id,
643                                        latest_unfinished_splits
644                                    );
645
646                                    // Replace the source reader with a new one of the new state.
647                                    let (reader, _backfill_info) = this
648                                        .build_stream_source_reader(
649                                            source_desc,
650                                            latest_unfinished_splits,
651                                        )
652                                        .await?;
653
654                                    Ok(reader)
655                                }
656
657                                self.backfill_state_store
658                                    .set_states(backfill_stage.states.clone())
659                                    .await?;
660                                self.backfill_state_store.commit(barrier.epoch).await?;
661
662                                if self.should_report_finished(&backfill_stage.states) {
663                                    // drop the backfill kafka consumers
664                                    backfill_stream = select_with_strategy(
665                                        input.by_ref().map(Either::Left),
666                                        futures::stream::pending().boxed().map(Either::Right),
667                                        select_strategy,
668                                    );
669
670                                    self.progress.finish(
671                                        barrier.epoch,
672                                        backfill_stage.total_backfilled_rows(),
673                                    );
674
675                                    let barrier_epoch = barrier.epoch;
676                                    let is_checkpoint = barrier.is_checkpoint();
677                                    // yield barrier after reporting progress
678                                    yield Message::Barrier(barrier);
679
680                                    if let Some(to_apply_mutation) = maybe_muatation {
681                                        self.apply_split_change_after_yield_barrier(
682                                            barrier_epoch,
683                                            &mut backfill_stage,
684                                            to_apply_mutation,
685                                        )
686                                        .await?;
687                                    }
688
689                                    if !state_table_initialized {
690                                        if is_checkpoint {
691                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
692                                            // We wait for 2nd epoch
693                                            let epoch = barrier_epoch.prev;
694                                            tracing::info!("waiting for epoch: {}", epoch);
695                                            state_store
696                                                .try_wait_epoch(
697                                                    HummockReadEpoch::Committed(epoch),
698                                                    TryWaitEpochOptions { table_id },
699                                                )
700                                                .await?;
701                                            tracing::info!("finished waiting for epoch: {}", epoch);
702                                            state_table_initialized = true;
703                                        }
704                                    } else {
705                                        // After we reported finished, we still don't exit the loop.
706                                        // Because we need to handle split migration.
707                                        assert!(
708                                            state_table_initialized,
709                                            "state table should be initialized before checking backfill finished"
710                                        );
711                                        if self.backfill_finished(&backfill_stage.states).await? {
712                                            tracing::info!("source backfill finished");
713                                            break 'backfill_loop;
714                                        }
715                                    }
716                                } else {
717                                    self.progress.update_for_source_backfill(
718                                        barrier.epoch,
719                                        backfill_stage.total_backfilled_rows(),
720                                    );
721
722                                    let barrier_epoch = barrier.epoch;
723                                    // yield barrier after reporting progress
724                                    yield Message::Barrier(barrier);
725
726                                    if let Some(to_apply_mutation) = maybe_muatation
727                                        && self
728                                            .apply_split_change_after_yield_barrier(
729                                                barrier_epoch,
730                                                &mut backfill_stage,
731                                                to_apply_mutation,
732                                            )
733                                            .await?
734                                    {
735                                        let reader = rebuild_reader_on_split_changed(
736                                            &self,
737                                            &backfill_stage,
738                                            &source_desc,
739                                        )
740                                        .await?;
741
742                                        backfill_stream = select_with_strategy(
743                                            input.by_ref().map(Either::Left),
744                                            reader.map(Either::Right),
745                                            select_strategy,
746                                        );
747                                    }
748                                }
749                            }
750                            Message::Chunk(chunk) => {
751                                // We need to iterate over all rows because there might be multiple splits in a chunk.
752                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
753                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
754
755                                for (i, (_, row)) in chunk.rows().enumerate() {
756                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
757                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
758                                    let vis = backfill_stage.handle_upstream_row(split, offset);
759                                    new_vis.set(i, vis);
760                                }
761                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
762                                let new_vis = new_vis.finish();
763                                if new_vis.count_ones() != 0 {
764                                    let new_chunk = chunk.clone_with_vis(new_vis);
765                                    yield Message::Chunk(new_chunk);
766                                }
767                            }
768                            Message::Watermark(_) => {
769                                // Ignore watermark during backfill.
770                            }
771                        }
772                    }
773                    // backfill
774                    Either::Right(msg) => {
775                        let chunk = msg?;
776
777                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
778                            // Pause to let barrier catch up via backpressure of snapshot stream.
779                            pause_control.self_pause();
780                            pause_reader!();
781
782                            // Exceeds the max wait barrier time, the source will be paused.
783                            // Currently we can guarantee the
784                            // source is not paused since it received stream
785                            // chunks.
786                            tracing::warn!(
787                                "source {} paused, wait barrier for {:?}",
788                                self.info.identity,
789                                last_barrier_time.elapsed()
790                            );
791
792                            // Only update `max_wait_barrier_time_ms` to capture
793                            // `barrier_interval_ms`
794                            // changes here to avoid frequently accessing the shared
795                            // `system_params`.
796                            max_wait_barrier_time_ms =
797                                self.system_params.load().barrier_interval_ms() as u128
798                                    * WAIT_BARRIER_MULTIPLE_TIMES;
799                        }
800                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
801
802                        for (i, (_, row)) in chunk.rows().enumerate() {
803                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
804                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
805                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
806                            new_vis.set(i, vis);
807                        }
808
809                        let new_vis = new_vis.finish();
810                        let card = new_vis.count_ones();
811                        if card != 0 {
812                            let new_chunk = chunk.clone_with_vis(new_vis);
813                            yield Message::Chunk(new_chunk);
814                            source_backfill_row_count.inc_by(card as u64);
815                        }
816                    }
817                }
818            }
819        }
820
821        std::mem::drop(backfill_stream);
822        let mut states = backfill_stage.states;
823        // Make sure `Finished` state is persisted.
824        self.backfill_state_store.set_states(states.clone()).await?;
825
826        // All splits finished backfilling. Now we only forward the source data.
827        #[for_await]
828        for msg in input {
829            let msg = msg?;
830            match msg {
831                Message::Barrier(barrier) => {
832                    let mut split_changed = None;
833                    if let Some(ref mutation) = barrier.mutation.as_deref() {
834                        match mutation {
835                            Mutation::Pause | Mutation::Resume => {
836                                // We don't need to do anything. Handled by upstream.
837                            }
838                            Mutation::SourceChangeSplit(actor_splits) => {
839                                tracing::info!(
840                                    actor_splits = ?actor_splits,
841                                    "source change split received"
842                                );
843                                split_changed = actor_splits
844                                    .get(&self.actor_ctx.id)
845                                    .cloned()
846                                    .map(|target_splits| (target_splits, &mut states, true));
847                            }
848                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
849                                split_changed = actor_splits
850                                    .get(&self.actor_ctx.id)
851                                    .cloned()
852                                    .map(|target_splits| (target_splits, &mut states, false));
853                            }
854                            _ => {}
855                        }
856                    }
857                    self.backfill_state_store.commit(barrier.epoch).await?;
858                    let barrier_epoch = barrier.epoch;
859                    yield Message::Barrier(barrier);
860
861                    if let Some((target_splits, state, should_trim_state)) = split_changed {
862                        self.apply_split_change_forward_stage_after_yield_barrier(
863                            barrier_epoch,
864                            target_splits,
865                            state,
866                            should_trim_state,
867                        )
868                        .await?;
869                    }
870                }
871                Message::Chunk(chunk) => {
872                    yield Message::Chunk(chunk);
873                }
874                Message::Watermark(watermark) => {
875                    yield Message::Watermark(watermark);
876                }
877            }
878        }
879    }
880
881    /// When we should call `progress.finish()` to let blocking DDL return.
882    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
883    ///
884    /// Note: split migration (online scaling) is related with progress tracking.
885    /// - For foreground DDL, scaling is not allowed before progress is finished.
886    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
887    ///
888    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
889    fn should_report_finished(&self, states: &BackfillStates) -> bool {
890        states.values().all(|state| {
891            matches!(
892                state.state,
893                BackfillState::Finished | BackfillState::SourceCachingUp(_)
894            )
895        })
896    }
897
898    /// All splits entered `Finished` state.
899    ///
900    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
901    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
902    /// this actor, we still need to backfill it.
903    ///
904    /// Note: at the beginning, the actor will only read the state written by itself.
905    /// It needs to _wait until it can read all actors' written data_.
906    /// i.e., wait for the second checkpoint has been available.
907    ///
908    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
909    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
910        Ok(states
911            .values()
912            .all(|state| matches!(state.state, BackfillState::Finished))
913            && self
914                .backfill_state_store
915                .scan_may_stale()
916                .await?
917                .into_iter()
918                .all(|state| matches!(state.state, BackfillState::Finished)))
919    }
920
921    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
922    async fn apply_split_change_after_yield_barrier(
923        &mut self,
924        barrier_epoch: EpochPair,
925        stage: &mut BackfillStage,
926        to_apply_mutation: ApplyMutationAfterBarrier,
927    ) -> StreamExecutorResult<bool> {
928        match to_apply_mutation {
929            ApplyMutationAfterBarrier::SourceChangeSplit {
930                target_splits,
931                should_trim_state,
932            } => {
933                self.source_split_change_count.inc();
934                {
935                    if self
936                        .update_state_if_changed(
937                            barrier_epoch,
938                            target_splits,
939                            stage,
940                            should_trim_state,
941                        )
942                        .await?
943                    {
944                        // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
945                        Ok(true)
946                    } else {
947                        Ok(false)
948                    }
949                }
950            }
951            ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
952        }
953    }
954
955    /// Returns `true` if split changed. Otherwise `false`.
956    async fn update_state_if_changed(
957        &mut self,
958        barrier_epoch: EpochPair,
959        target_splits: Vec<SplitImpl>,
960        stage: &mut BackfillStage,
961        should_trim_state: bool,
962    ) -> StreamExecutorResult<bool> {
963        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
964
965        let mut split_changed = false;
966        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
967        // Will be set to target_state in the end.
968        let old_states = std::mem::take(&mut stage.states);
969        let committed_reader = self
970            .backfill_state_store
971            .new_committed_reader(barrier_epoch)
972            .await?;
973        // Iterate over the target (assigned) splits
974        // - check if any new splits are added
975        // - build target_state
976        for split in &target_splits {
977            let split_id = split.id();
978            if let Some(s) = old_states.get(&split_id) {
979                target_state.insert(split_id, s.clone());
980            } else {
981                split_changed = true;
982
983                let backfill_state = committed_reader
984                    .try_recover_from_state_store(&split_id)
985                    .await?;
986                match backfill_state {
987                    None => {
988                        // Newly added split. We don't need to backfill.
989                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
990                        target_state.insert(
991                            split_id,
992                            BackfillStateWithProgress {
993                                state: BackfillState::Finished,
994                                num_consumed_rows: 0,
995                                target_offset: None,
996                            },
997                        );
998                    }
999                    Some(backfill_state) => {
1000                        // Migrated split. Backfill if unfinished.
1001                        target_state.insert(split_id, backfill_state);
1002                    }
1003                }
1004            }
1005        }
1006
1007        // Checks dropped splits
1008        for existing_split_id in old_states.keys() {
1009            if !target_state.contains_key(existing_split_id) {
1010                tracing::info!("split dropping detected: {}", existing_split_id);
1011                split_changed = true;
1012            }
1013        }
1014
1015        if split_changed {
1016            let dropped_splits = stage
1017                .states
1018                .extract_if(|split_id, _| !target_state.contains_key(split_id))
1019                .map(|(split_id, _)| split_id);
1020
1021            if should_trim_state {
1022                // trim dropped splits' state
1023                self.backfill_state_store.trim_state(dropped_splits).await?;
1024            }
1025            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1026        } else {
1027            debug_assert_eq!(old_states, target_state);
1028        }
1029        stage.states = target_state;
1030        stage.splits = target_splits;
1031        stage.debug_assert_consistent();
1032        Ok(split_changed)
1033    }
1034
1035    /// For split change during forward stage, all newly added splits should be already finished.
1036    // We just need to update the state store if necessary.
1037    async fn apply_split_change_forward_stage_after_yield_barrier(
1038        &mut self,
1039        barrier_epoch: EpochPair,
1040        target_splits: Vec<SplitImpl>,
1041        states: &mut BackfillStates,
1042        should_trim_state: bool,
1043    ) -> StreamExecutorResult<()> {
1044        self.source_split_change_count.inc();
1045        {
1046            self.update_state_if_changed_forward_stage(
1047                barrier_epoch,
1048                target_splits,
1049                states,
1050                should_trim_state,
1051            )
1052            .await?;
1053        }
1054
1055        Ok(())
1056    }
1057
1058    async fn update_state_if_changed_forward_stage(
1059        &mut self,
1060        barrier_epoch: EpochPair,
1061        target_splits: Vec<SplitImpl>,
1062        states: &mut BackfillStates,
1063        should_trim_state: bool,
1064    ) -> StreamExecutorResult<()> {
1065        let target_splits: HashSet<SplitId> =
1066            target_splits.into_iter().map(|split| split.id()).collect();
1067
1068        let mut split_changed = false;
1069        let mut newly_added_splits = vec![];
1070
1071        let committed_reader = self
1072            .backfill_state_store
1073            .new_committed_reader(barrier_epoch)
1074            .await?;
1075
1076        // Checks added splits
1077        for split_id in &target_splits {
1078            if !states.contains_key(split_id) {
1079                split_changed = true;
1080
1081                let backfill_state = committed_reader
1082                    .try_recover_from_state_store(split_id)
1083                    .await?;
1084                match &backfill_state {
1085                    None => {
1086                        // Newly added split. We don't need to backfill!
1087                        newly_added_splits.push(split_id.clone());
1088                    }
1089                    Some(backfill_state) => {
1090                        // Migrated split. It should also be finished since we are in forwarding stage.
1091                        match backfill_state.state {
1092                            BackfillState::Finished => {}
1093                            _ => {
1094                                return Err(anyhow::anyhow!(
1095                                    "Unexpected backfill state: {:?}",
1096                                    backfill_state
1097                                )
1098                                .into());
1099                            }
1100                        }
1101                    }
1102                }
1103                states.insert(
1104                    split_id.clone(),
1105                    backfill_state.unwrap_or(BackfillStateWithProgress {
1106                        state: BackfillState::Finished,
1107                        num_consumed_rows: 0,
1108                        target_offset: None,
1109                    }),
1110                );
1111            }
1112        }
1113
1114        // Checks dropped splits
1115        for existing_split_id in states.keys() {
1116            if !target_splits.contains(existing_split_id) {
1117                tracing::info!("split dropping detected: {}", existing_split_id);
1118                split_changed = true;
1119            }
1120        }
1121
1122        if split_changed {
1123            tracing::info!(
1124                target_splits = ?target_splits,
1125                "apply split change"
1126            );
1127
1128            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1129
1130            if should_trim_state {
1131                // trim dropped splits' state
1132                self.backfill_state_store
1133                    .trim_state(dropped_splits.map(|(k, _v)| k))
1134                    .await?;
1135            }
1136
1137            // For migrated splits, and existing splits, we do not need to update
1138            // state store, but only for newly added splits.
1139            self.backfill_state_store
1140                .set_states(
1141                    newly_added_splits
1142                        .into_iter()
1143                        .map(|split_id| {
1144                            (
1145                                split_id,
1146                                BackfillStateWithProgress {
1147                                    state: BackfillState::Finished,
1148                                    num_consumed_rows: 0,
1149                                    target_offset: None,
1150                                },
1151                            )
1152                        })
1153                        .collect(),
1154                )
1155                .await?;
1156        }
1157
1158        Ok(())
1159    }
1160}
1161
1162fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1163    let a = a.parse::<i64>().unwrap();
1164    let b = b.parse::<i64>().unwrap();
1165    a.cmp(&b)
1166}
1167
1168impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1169    fn execute(self: Box<Self>) -> BoxedMessageStream {
1170        self.inner.execute(self.input).boxed()
1171    }
1172}
1173
1174impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1175    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1176        f.debug_struct("SourceBackfillExecutor")
1177            .field("source_id", &self.source_id)
1178            .field("column_ids", &self.column_ids)
1179            .field("stream_key", &self.info.stream_key)
1180            .finish()
1181    }
1182}
1183
1184struct PauseControl {
1185    // Paused due to backfill order control
1186    backfill_paused: bool,
1187    // Paused due to self-pause, e.g. let barrier catch up
1188    self_paused: bool,
1189    // Paused due to Pause command from meta, pause_on_next_bootstrap
1190    command_paused: bool,
1191    // reader paused
1192    reader_paused: bool,
1193}
1194
1195impl PauseControl {
1196    fn new() -> Self {
1197        Self {
1198            backfill_paused: false,
1199            self_paused: false,
1200            command_paused: false,
1201            reader_paused: false,
1202        }
1203    }
1204
1205    fn is_paused(&self) -> bool {
1206        self.backfill_paused || self.command_paused || self.self_paused
1207    }
1208
1209    /// returns whether we need to pause the reader.
1210    fn backfill_pause(&mut self) {
1211        if self.backfill_paused {
1212            tracing::warn!("backfill_pause invoked twice");
1213        }
1214        self.backfill_paused = true;
1215    }
1216
1217    /// returns whether we need to resume the reader.
1218    /// same precedence as command.
1219    fn backfill_resume(&mut self) -> bool {
1220        if !self.backfill_paused {
1221            tracing::warn!("backfill_resume invoked twice");
1222        }
1223        !self.command_paused
1224    }
1225
1226    /// returns whether we need to pause the reader.
1227    fn self_pause(&mut self) {
1228        assert!(
1229            !self.backfill_paused,
1230            "backfill stream should not be read when backfill_pause is set"
1231        );
1232        assert!(
1233            !self.command_paused,
1234            "backfill stream should not be read when command_pause is set"
1235        );
1236        if self.self_paused {
1237            tracing::warn!("self_pause invoked twice");
1238        }
1239        self.self_paused = true;
1240    }
1241
1242    /// returns whether we need to resume the reader.
1243    /// `self_resume` has the lowest precedence,
1244    /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1245    fn self_resume(&mut self) -> bool {
1246        self.self_paused = false;
1247        !(self.backfill_paused || self.command_paused)
1248    }
1249
1250    /// returns whether we need to pause the reader.
1251    fn command_pause(&mut self) {
1252        if self.command_paused {
1253            tracing::warn!("command_pause invoked twice");
1254        }
1255        self.command_paused = true;
1256    }
1257
1258    /// returns whether we need to resume the reader.
1259    /// same precedence as backfill.
1260    fn command_resume(&mut self) -> bool {
1261        if !self.command_paused {
1262            tracing::warn!("command_resume invoked twice");
1263        }
1264        self.command_paused = false;
1265        !self.backfill_paused
1266    }
1267}