risingwave_stream/executor/source/
source_backfill_executor.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::ColumnId;
25use risingwave_common::id::SourceId;
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
27use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::JsonbVal;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::{
33    BackfillInfo, BoxSourceReaderEventStream, SourceContext, SourceCtrlOpts, SourceReaderEvent,
34    SplitId, SplitImpl, SplitMetaData,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::common::ThrottleType;
38use risingwave_storage::store::TryWaitEpochOptions;
39use serde::{Deserialize, Serialize};
40use thiserror_ext::AsReport;
41
42use super::executor_core::StreamSourceCore;
43use super::source_backfill_state_table::BackfillStateTableHandler;
44use super::{apply_rate_limit_to_source_reader_event, get_split_offset_col_idx};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
49use crate::task::CreateMviewProgressReporter;
50
51#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
52pub enum BackfillState {
53    /// `None` means not started yet. It's the initial state.
54    /// XXX: perhaps we can also set to low-watermark instead of `None`
55    Backfilling(Option<String>),
56    /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
57    SourceCachingUp(String),
58    Finished,
59}
60pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
61
62/// Only `state` field is the real state for fail-over.
63/// Other fields are for observability (but we still need to persist them).
64#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
65pub struct BackfillStateWithProgress {
66    pub state: BackfillState,
67    pub num_consumed_rows: u64,
68    /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
69    /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
70    /// so that we can finish backfilling even when upstream doesn't emit any data.
71    pub target_offset: Option<String>,
72}
73
74impl BackfillStateWithProgress {
75    pub fn encode_to_json(self) -> JsonbVal {
76        serde_json::to_value(self).unwrap().into()
77    }
78
79    pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
80        serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
81    }
82}
83
84pub struct SourceBackfillExecutor<S: StateStore> {
85    pub inner: SourceBackfillExecutorInner<S>,
86    /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
87    pub input: Executor,
88}
89
90pub struct SourceBackfillExecutorInner<S: StateStore> {
91    actor_ctx: ActorContextRef,
92    info: ExecutorInfo,
93
94    /// Streaming source for external
95    source_id: SourceId,
96    source_name: String,
97    column_ids: Vec<ColumnId>,
98    source_desc_builder: Option<SourceDescBuilder>,
99    backfill_state_store: BackfillStateTableHandler<S>,
100
101    /// Metrics for monitor.
102    metrics: Arc<StreamingMetrics>,
103    source_split_change_count: LabelGuardedIntCounter,
104
105    // /// Receiver of barrier channel.
106    // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
107    /// System parameter reader to read barrier interval
108    system_params: SystemParamsReaderRef,
109
110    /// Rate limit in rows/s.
111    rate_limit_rps: Option<u32>,
112
113    progress: CreateMviewProgressReporter,
114}
115
116/// Local variables used in the backfill stage.
117///
118/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
119///
120/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
121#[derive(Debug)]
122struct BackfillStage {
123    states: BackfillStates,
124    /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
125    ///
126    /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
127    splits: Vec<SplitImpl>,
128}
129
130enum ApplyMutationAfterBarrier {
131    SourceChangeSplit {
132        target_splits: Vec<SplitImpl>,
133        should_trim_state: bool,
134    },
135    ConnectorPropsChange,
136}
137
138impl BackfillStage {
139    fn total_backfilled_rows(&self) -> u64 {
140        self.states.values().map(|s| s.num_consumed_rows).sum()
141    }
142
143    fn debug_assert_consistent(&self) {
144        if cfg!(debug_assertions) {
145            let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
146            assert_eq!(
147                self.states.keys().cloned().collect::<HashSet<_>>(),
148                all_splits
149            );
150        }
151    }
152
153    /// Get unfinished splits with latest offsets according to the backfill states.
154    fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
155        let mut unfinished_splits = Vec::new();
156        for split in &self.splits {
157            let state = &self.states.get(split.id().as_ref()).unwrap().state;
158            match state {
159                BackfillState::Backfilling(Some(offset)) => {
160                    let mut updated_split = split.clone();
161                    updated_split.update_in_place(offset.clone())?;
162                    unfinished_splits.push(updated_split);
163                }
164                BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
165                BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
166            }
167        }
168        Ok(unfinished_splits)
169    }
170
171    /// Updates backfill states and `target_offsets` from upstream `SourceExecutor` chunks, and
172    /// returns whether the row is visible.
173    ///
174    /// Note that the online catch-up logic intentionally relies on the upstream chunk stream
175    /// carrying split/offset columns. We do not reconcile live upstream progress from source state
176    /// tables here, because source backfill is designed to advance according to the upstream data
177    /// stream instead of a side channel.
178    fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
179        let mut vis = false;
180        let state = self.states.get_mut(split_id).unwrap();
181        let state_inner = &mut state.state;
182        match state_inner {
183            BackfillState::Backfilling(None) => {
184                // backfilling for this split is not started yet. Ignore this row
185            }
186            BackfillState::Backfilling(Some(backfill_offset)) => {
187                match compare_kafka_offset(backfill_offset, offset) {
188                    Ordering::Less => {
189                        // continue backfilling. Ignore this row
190                    }
191                    Ordering::Equal => {
192                        // backfilling for this split is finished just right.
193                        *state_inner = BackfillState::Finished;
194                    }
195                    Ordering::Greater => {
196                        // backfilling for this split produced more data than current source's progress.
197                        // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
198                        *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
199                    }
200                }
201            }
202            BackfillState::SourceCachingUp(backfill_offset) => {
203                match compare_kafka_offset(backfill_offset, offset) {
204                    Ordering::Less => {
205                        // Source caught up, but doesn't contain the last backfilled row.
206                        // This may happen e.g., if Kafka performed compaction.
207                        vis = true;
208                        *state_inner = BackfillState::Finished;
209                    }
210                    Ordering::Equal => {
211                        // Source just caught up with backfilling.
212                        *state_inner = BackfillState::Finished;
213                    }
214                    Ordering::Greater => {
215                        // Source is still behind backfilling.
216                    }
217                }
218            }
219            BackfillState::Finished => {
220                vis = true;
221                // This split's backfilling is finished, we are waiting for other splits
222            }
223        }
224        if matches!(state_inner, BackfillState::Backfilling(_)) {
225            state.target_offset = Some(offset.to_owned());
226        }
227        if vis {
228            debug_assert_eq!(*state_inner, BackfillState::Finished);
229        }
230        vis
231    }
232
233    /// Updates backfill states and returns whether the row backfilled from external system is visible.
234    fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
235        let state = self.states.get_mut(split_id).unwrap();
236        state.num_consumed_rows += 1;
237        let state_inner = &mut state.state;
238        match state_inner {
239            BackfillState::Backfilling(_old_offset) => {
240                if let Some(target_offset) = &state.target_offset
241                    && compare_kafka_offset(offset, target_offset).is_ge()
242                {
243                    // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
244                    // and dropping duplicated messages.
245                    // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
246                    // In this case, upstream hasn't reached the target_offset yet.
247                    //
248                    // Note2: after this, all following rows in the current chunk will be invisible.
249                    //
250                    // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
251                    // keep backfilling.
252                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
253                } else {
254                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
255                }
256                true
257            }
258            BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
259                // backfilling stopped. ignore
260                false
261            }
262        }
263    }
264
265    /// Updates backfill state with split progress emitted by the backfill reader when there are no
266    /// corresponding visible rows.
267    ///
268    /// This is needed for connectors such as Kafka under `read_committed`, where the reader can
269    /// reach a later offset because of EOF/control records without yielding a data chunk. This
270    /// supplements `handle_backfill_row`, but does not replace the upstream-chunk-based progress
271    /// tracking in `handle_upstream_row`.
272    fn handle_backfill_progress(&mut self, split_id: &str, offset: &str) {
273        let Some(state) = self.states.get_mut(split_id) else {
274            return;
275        };
276        let state_inner = &mut state.state;
277        match state_inner {
278            BackfillState::Backfilling(_old_offset) => {
279                if let Some(target_offset) = &state.target_offset
280                    && compare_kafka_offset(offset, target_offset).is_ge()
281                {
282                    *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
283                } else {
284                    *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
285                }
286            }
287            BackfillState::SourceCachingUp(backfill_offset) => {
288                if compare_kafka_offset(backfill_offset, offset).is_lt() {
289                    *backfill_offset = offset.to_owned();
290                }
291            }
292            BackfillState::Finished => {}
293        }
294    }
295}
296
297impl<S: StateStore> SourceBackfillExecutorInner<S> {
298    #[expect(clippy::too_many_arguments)]
299    pub fn new(
300        actor_ctx: ActorContextRef,
301        info: ExecutorInfo,
302        stream_source_core: StreamSourceCore<S>,
303        metrics: Arc<StreamingMetrics>,
304        system_params: SystemParamsReaderRef,
305        backfill_state_store: BackfillStateTableHandler<S>,
306        rate_limit_rps: Option<u32>,
307        progress: CreateMviewProgressReporter,
308    ) -> Self {
309        let StreamSourceCore {
310            source_id,
311            source_name,
312            column_ids,
313            source_desc_builder,
314            ..
315        } = stream_source_core;
316        let source_split_change_count = metrics
317            .source_split_change_count
318            .with_guarded_label_values(&[
319                &source_id.to_string(),
320                &source_name,
321                &actor_ctx.id.to_string(),
322                &actor_ctx.fragment_id.to_string(),
323            ]);
324
325        Self {
326            actor_ctx,
327            info,
328            source_id,
329            source_name,
330            column_ids,
331            source_desc_builder,
332            backfill_state_store,
333            metrics,
334            source_split_change_count,
335            system_params,
336            rate_limit_rps,
337            progress,
338        }
339    }
340
341    async fn build_stream_source_reader(
342        &self,
343        source_desc: &SourceDesc,
344        splits: Vec<SplitImpl>,
345    ) -> StreamExecutorResult<(BoxSourceReaderEventStream, HashMap<SplitId, BackfillInfo>)> {
346        let column_ids = source_desc
347            .columns
348            .iter()
349            .map(|column_desc| column_desc.column_id)
350            .collect_vec();
351        let source_ctx = SourceContext::new(
352            self.actor_ctx.id,
353            self.source_id,
354            self.actor_ctx.fragment_id,
355            self.source_name.clone(),
356            source_desc.metrics.clone(),
357            SourceCtrlOpts {
358                chunk_size: limited_chunk_size(self.rate_limit_rps),
359                split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
360            },
361            source_desc.source.config.clone(),
362            None,
363        );
364
365        // We will check watermark to decide whether we need to backfill.
366        // e.g., when there's a Kafka topic-partition without any data,
367        // we don't need to backfill at all. But if we do not check here,
368        // the executor can only know it's finished when data coming in.
369        // For blocking DDL, this would be annoying.
370
371        let (stream, res) = source_desc
372            .source
373            .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
374            .await
375            .map_err(StreamExecutorError::connector_error)?;
376        Ok((
377            apply_rate_limit_to_source_reader_event(stream, self.rate_limit_rps).boxed(),
378            res.backfill_info,
379        ))
380    }
381
382    #[try_stream(ok = Message, error = StreamExecutorError)]
383    async fn execute(mut self, input: Executor) {
384        let mut input = input.execute();
385
386        // Poll the upstream to get the first barrier.
387        let barrier = expect_first_barrier(&mut input).await?;
388        let first_epoch = barrier.epoch;
389        let owned_splits = barrier
390            .initial_split_assignment(self.actor_ctx.id)
391            .unwrap_or(&[])
392            .to_vec();
393
394        let mut pause_control = PauseControl::new();
395        if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
396            pause_control.backfill_pause();
397        }
398        if barrier.is_pause_on_startup() {
399            pause_control.command_pause();
400        }
401        yield Message::Barrier(barrier);
402
403        let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
404        let mut source_desc = source_desc_builder
405            .build()
406            .map_err(StreamExecutorError::connector_error)?;
407
408        // source backfill only applies to kafka, so we don't need to get pulsar's `message_id_data_idx`.
409        let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
410        else {
411            unreachable!("Partition and offset columns must be set.");
412        };
413
414        self.backfill_state_store.init_epoch(first_epoch).await?;
415
416        let mut backfill_states: BackfillStates = HashMap::new();
417        {
418            let committed_reader = self
419                .backfill_state_store
420                .new_committed_reader(first_epoch)
421                .await?;
422            for split in &owned_splits {
423                let split_id = split.id();
424                let backfill_state = committed_reader
425                    .try_recover_from_state_store(&split_id)
426                    .await?
427                    .unwrap_or(BackfillStateWithProgress {
428                        state: BackfillState::Backfilling(None),
429                        num_consumed_rows: 0,
430                        target_offset: None, // init with None
431                    });
432                backfill_states.insert(split_id, backfill_state);
433            }
434        }
435        let mut backfill_stage = BackfillStage {
436            states: backfill_states,
437            splits: owned_splits,
438        };
439        backfill_stage.debug_assert_consistent();
440
441        let (source_chunk_reader, backfill_info) = self
442            .build_stream_source_reader(
443                &source_desc,
444                backfill_stage.get_latest_unfinished_splits()?,
445            )
446            .instrument_await("source_build_reader")
447            .await?;
448        for (split_id, info) in &backfill_info {
449            let state = backfill_stage.states.get_mut(split_id).unwrap();
450            match info {
451                BackfillInfo::NoDataToBackfill => {
452                    state.state = BackfillState::Finished;
453                }
454                BackfillInfo::HasDataToBackfill { latest_offset } => {
455                    // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
456                    state.target_offset = Some(latest_offset.clone());
457                }
458            }
459        }
460        tracing::debug!(?backfill_stage, "source backfill started");
461
462        fn select_strategy(_: &mut ()) -> PollNext {
463            futures::stream::PollNext::Left
464        }
465
466        // We choose "preferring upstream" strategy here, because:
467        // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
468        //   For chunks from upstream, they are simply dropped, so there's no much overhead.
469        //   So possibly this can also affect other running jobs less.
470        // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
471        let mut backfill_stream = select_with_strategy(
472            input.by_ref().map(Either::Left),
473            source_chunk_reader.map(Either::Right),
474            select_strategy,
475        );
476
477        type PausedReader = Option<impl Stream>;
478        let mut paused_reader: PausedReader = None;
479
480        macro_rules! pause_reader {
481            () => {
482                if !pause_control.reader_paused {
483                    let (left, right) = backfill_stream.into_inner();
484                    backfill_stream = select_with_strategy(
485                        left,
486                        futures::stream::pending().boxed().map(Either::Right),
487                        select_strategy,
488                    );
489                    // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
490                    paused_reader = Some(right);
491                    pause_control.reader_paused = true;
492                }
493            };
494        }
495
496        macro_rules! resume_reader {
497            () => {
498                if pause_control.reader_paused {
499                    backfill_stream = select_with_strategy(
500                        input.by_ref().map(Either::Left),
501                        paused_reader
502                            .take()
503                            .expect("should have paused reader to resume"),
504                        select_strategy,
505                    );
506                    pause_control.reader_paused = false;
507                }
508            };
509        }
510
511        if pause_control.is_paused() {
512            pause_reader!();
513        }
514
515        let state_store = self
516            .backfill_state_store
517            .state_store()
518            .state_store()
519            .clone();
520        let table_id = self.backfill_state_store.state_store().table_id();
521        let mut state_table_initialized = false;
522        {
523            let source_backfill_row_count = self
524                .metrics
525                .source_backfill_row_count
526                .with_guarded_label_values(&[
527                    &self.source_id.to_string(),
528                    &self.source_name,
529                    &self.actor_ctx.id.to_string(),
530                    &self.actor_ctx.fragment_id.to_string(),
531                ]);
532
533            // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
534            // milliseconds, considering some other latencies like network and cost in Meta.
535            let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
536                as u128
537                * WAIT_BARRIER_MULTIPLE_TIMES;
538            let mut last_barrier_time = Instant::now();
539
540            // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
541            'backfill_loop: while let Some(either) = backfill_stream.next().await {
542                match either {
543                    // Upstream
544                    Either::Left(msg) => {
545                        let Ok(msg) = msg else {
546                            let e = msg.unwrap_err();
547                            tracing::error!(
548                                error = ?e.as_report(),
549                                source_id = %self.source_id,
550                                "stream source reader error",
551                            );
552                            GLOBAL_ERROR_METRICS.user_source_error.report([
553                                "SourceReaderError".to_owned(),
554                                self.source_id.to_string(),
555                                self.source_name.clone(),
556                                self.actor_ctx.fragment_id.to_string(),
557                            ]);
558
559                            let (reader, _backfill_info) = self
560                                .build_stream_source_reader(
561                                    &source_desc,
562                                    backfill_stage.get_latest_unfinished_splits()?,
563                                )
564                                .await?;
565
566                            backfill_stream = select_with_strategy(
567                                input.by_ref().map(Either::Left),
568                                reader.map(Either::Right),
569                                select_strategy,
570                            );
571                            continue;
572                        };
573                        match msg {
574                            Message::Barrier(barrier) => {
575                                last_barrier_time = Instant::now();
576
577                                if pause_control.self_resume() {
578                                    resume_reader!();
579                                }
580
581                                let mut maybe_muatation = None;
582                                if let Some(ref mutation) = barrier.mutation.as_deref() {
583                                    match mutation {
584                                        Mutation::Pause => {
585                                            // pause_reader should not be invoked consecutively more than once.
586                                            pause_control.command_pause();
587                                            pause_reader!();
588                                        }
589                                        Mutation::Resume => {
590                                            // pause_reader.take should not be invoked consecutively more than once.
591                                            if pause_control.command_resume() {
592                                                resume_reader!();
593                                            }
594                                        }
595                                        Mutation::StartFragmentBackfill { fragment_ids } => {
596                                            if fragment_ids.contains(&self.actor_ctx.fragment_id)
597                                                && pause_control.backfill_resume()
598                                            {
599                                                resume_reader!();
600                                            }
601                                        }
602                                        Mutation::SourceChangeSplit(actor_splits) => {
603                                            tracing::info!(
604                                                actor_splits = ?actor_splits,
605                                                "source change split received"
606                                            );
607                                            maybe_muatation = actor_splits
608                                                .get(&self.actor_ctx.id)
609                                                .cloned()
610                                                .map(|target_splits| {
611                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
612                                                        target_splits,
613                                                        should_trim_state: true,
614                                                    }
615                                                });
616                                        }
617                                        Mutation::Update(UpdateMutation {
618                                            actor_splits, ..
619                                        }) => {
620                                            maybe_muatation = actor_splits
621                                                .get(&self.actor_ctx.id)
622                                                .cloned()
623                                                .map(|target_splits| {
624                                                    ApplyMutationAfterBarrier::SourceChangeSplit {
625                                                        target_splits,
626                                                        should_trim_state: false,
627                                                    }
628                                                });
629                                        }
630                                        Mutation::ConnectorPropsChange(maybe_mutation) => {
631                                            if let Some(props_plaintext) =
632                                                maybe_mutation.get(&self.source_id.as_raw_id())
633                                            {
634                                                source_desc
635                                                    .update_reader(props_plaintext.clone())?;
636
637                                                maybe_muatation = Some(
638                                                    ApplyMutationAfterBarrier::ConnectorPropsChange,
639                                                );
640                                            }
641                                        }
642                                        Mutation::Throttle(fragment_to_apply) => {
643                                            if let Some(entry) =
644                                                fragment_to_apply.get(&self.actor_ctx.fragment_id)
645                                                && entry.throttle_type() == ThrottleType::Backfill
646                                                && entry.rate_limit != self.rate_limit_rps
647                                            {
648                                                tracing::info!(
649                                                    "updating rate limit from {:?} to {:?}",
650                                                    self.rate_limit_rps,
651                                                    entry.rate_limit
652                                                );
653                                                self.rate_limit_rps = entry.rate_limit;
654                                                // rebuild reader
655                                                let (reader, _backfill_info) = self
656                                                    .build_stream_source_reader(
657                                                        &source_desc,
658                                                        backfill_stage
659                                                            .get_latest_unfinished_splits()?,
660                                                    )
661                                                    .await?;
662
663                                                backfill_stream = select_with_strategy(
664                                                    input.by_ref().map(Either::Left),
665                                                    reader.map(Either::Right),
666                                                    select_strategy,
667                                                );
668                                            }
669                                        }
670                                        _ => {}
671                                    }
672                                }
673                                async fn rebuild_reader_on_split_changed(
674                                    this: &SourceBackfillExecutorInner<impl StateStore>,
675                                    backfill_stage: &BackfillStage,
676                                    source_desc: &SourceDesc,
677                                ) -> StreamExecutorResult<BoxSourceReaderEventStream>
678                                {
679                                    // rebuild backfill_stream
680                                    // Note: we don't put this part in a method, due to some complex lifetime issues.
681
682                                    let latest_unfinished_splits =
683                                        backfill_stage.get_latest_unfinished_splits()?;
684                                    tracing::info!(
685                                        "actor {:?} apply source split change to {:?}",
686                                        this.actor_ctx.id,
687                                        latest_unfinished_splits
688                                    );
689
690                                    // Replace the source reader with a new one of the new state.
691                                    let (reader, _backfill_info) = this
692                                        .build_stream_source_reader(
693                                            source_desc,
694                                            latest_unfinished_splits,
695                                        )
696                                        .await?;
697
698                                    Ok(reader)
699                                }
700
701                                self.backfill_state_store
702                                    .set_states(backfill_stage.states.clone())
703                                    .await?;
704                                self.backfill_state_store.commit(barrier.epoch).await?;
705
706                                if self.should_report_finished(&backfill_stage.states) {
707                                    // drop the backfill kafka consumers
708                                    backfill_stream = select_with_strategy(
709                                        input.by_ref().map(Either::Left),
710                                        futures::stream::pending().boxed().map(Either::Right),
711                                        select_strategy,
712                                    );
713
714                                    self.progress.finish(
715                                        barrier.epoch,
716                                        backfill_stage.total_backfilled_rows(),
717                                    );
718
719                                    let barrier_epoch = barrier.epoch;
720                                    let is_checkpoint = barrier.is_checkpoint();
721                                    // yield barrier after reporting progress
722                                    yield Message::Barrier(barrier);
723
724                                    if let Some(to_apply_mutation) = maybe_muatation {
725                                        self.apply_split_change_after_yield_barrier(
726                                            barrier_epoch,
727                                            &mut backfill_stage,
728                                            to_apply_mutation,
729                                        )
730                                        .await?;
731                                    }
732
733                                    if !state_table_initialized {
734                                        if is_checkpoint {
735                                            // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
736                                            // We wait for 2nd epoch
737                                            let epoch = barrier_epoch.prev;
738                                            tracing::info!("waiting for epoch: {}", epoch);
739                                            state_store
740                                                .try_wait_epoch(
741                                                    HummockReadEpoch::Committed(epoch),
742                                                    TryWaitEpochOptions { table_id },
743                                                )
744                                                .await?;
745                                            tracing::info!("finished waiting for epoch: {}", epoch);
746                                            state_table_initialized = true;
747                                        }
748                                    } else {
749                                        // After we reported finished, we still don't exit the loop.
750                                        // Because we need to handle split migration.
751                                        assert!(
752                                            state_table_initialized,
753                                            "state table should be initialized before checking backfill finished"
754                                        );
755                                        if self.backfill_finished(&backfill_stage.states).await? {
756                                            tracing::info!("source backfill finished");
757                                            break 'backfill_loop;
758                                        }
759                                    }
760                                } else {
761                                    self.progress.update_for_source_backfill(
762                                        barrier.epoch,
763                                        backfill_stage.total_backfilled_rows(),
764                                    );
765
766                                    let barrier_epoch = barrier.epoch;
767                                    // yield barrier after reporting progress
768                                    yield Message::Barrier(barrier);
769
770                                    if let Some(to_apply_mutation) = maybe_muatation
771                                        && self
772                                            .apply_split_change_after_yield_barrier(
773                                                barrier_epoch,
774                                                &mut backfill_stage,
775                                                to_apply_mutation,
776                                            )
777                                            .await?
778                                    {
779                                        let reader = rebuild_reader_on_split_changed(
780                                            &self,
781                                            &backfill_stage,
782                                            &source_desc,
783                                        )
784                                        .await?;
785
786                                        backfill_stream = select_with_strategy(
787                                            input.by_ref().map(Either::Left),
788                                            reader.map(Either::Right),
789                                            select_strategy,
790                                        );
791                                    }
792                                }
793                            }
794                            Message::Chunk(chunk) => {
795                                // We need to iterate over all rows because there might be multiple splits in a chunk.
796                                // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
797                                let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
798
799                                for (i, (_, row)) in chunk.rows().enumerate() {
800                                    let split = row.datum_at(split_idx).unwrap().into_utf8();
801                                    let offset = row.datum_at(offset_idx).unwrap().into_utf8();
802                                    let vis = backfill_stage.handle_upstream_row(split, offset);
803                                    new_vis.set(i, vis);
804                                }
805                                // emit chunk if vis is not empty. i.e., some splits finished backfilling.
806                                let new_vis = new_vis.finish();
807                                if new_vis.count_ones() != 0 {
808                                    let new_chunk = chunk.clone_with_vis(new_vis);
809                                    yield Message::Chunk(new_chunk);
810                                }
811                            }
812                            Message::Watermark(_) => {
813                                // Ignore watermark during backfill.
814                            }
815                        }
816                    }
817                    // backfill
818                    Either::Right(msg) => {
819                        let chunk = match msg? {
820                            SourceReaderEvent::DataChunk(chunk) => chunk,
821                            SourceReaderEvent::SplitProgress(split_progress) => {
822                                // `SplitProgress` is consumed only from the backfill reader side.
823                                // The upstream side of source backfill still advances based on
824                                // normal chunks with split/offset columns.
825                                for (split_id, offset) in split_progress {
826                                    let previous_state =
827                                        backfill_stage.states.get(&split_id).cloned();
828                                    backfill_stage
829                                        .handle_backfill_progress(split_id.as_ref(), &offset);
830                                    let updated_state =
831                                        backfill_stage.states.get(&split_id).cloned();
832                                    if previous_state != updated_state {
833                                        tracing::info!(
834                                            actor_id = %self.actor_ctx.id,
835                                            fragment_id = %self.actor_ctx.fragment_id,
836                                            source_id = %self.source_id,
837                                            source_name = self.source_name,
838                                            split_id = %split_id,
839                                            applied_offset = %offset,
840                                            previous_state = ?previous_state,
841                                            updated_state = ?updated_state,
842                                            "source backfill executor applied split progress offset"
843                                        );
844                                    }
845                                }
846                                continue;
847                            }
848                        };
849
850                        if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
851                            // Pause to let barrier catch up via backpressure of snapshot stream.
852                            pause_control.self_pause();
853                            pause_reader!();
854
855                            // Exceeds the max wait barrier time, the source will be paused.
856                            // Currently we can guarantee the
857                            // source is not paused since it received stream
858                            // chunks.
859                            tracing::warn!(
860                                "source {} paused, wait barrier for {:?}",
861                                self.info.identity,
862                                last_barrier_time.elapsed()
863                            );
864
865                            // Only update `max_wait_barrier_time_ms` to capture
866                            // `barrier_interval_ms`
867                            // changes here to avoid frequently accessing the shared
868                            // `system_params`.
869                            max_wait_barrier_time_ms =
870                                self.system_params.load().barrier_interval_ms() as u128
871                                    * WAIT_BARRIER_MULTIPLE_TIMES;
872                        }
873                        let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
874
875                        for (i, (_, row)) in chunk.rows().enumerate() {
876                            let split_id = row.datum_at(split_idx).unwrap().into_utf8();
877                            let offset = row.datum_at(offset_idx).unwrap().into_utf8();
878                            let vis = backfill_stage.handle_backfill_row(split_id, offset);
879                            new_vis.set(i, vis);
880                        }
881
882                        let new_vis = new_vis.finish();
883                        let card = new_vis.count_ones();
884                        if card != 0 {
885                            let new_chunk = chunk.clone_with_vis(new_vis);
886                            yield Message::Chunk(new_chunk);
887                            source_backfill_row_count.inc_by(card as u64);
888                        }
889                    }
890                }
891            }
892        }
893
894        std::mem::drop(backfill_stream);
895        let mut states = backfill_stage.states;
896        // Make sure `Finished` state is persisted.
897        self.backfill_state_store.set_states(states.clone()).await?;
898
899        // All splits finished backfilling. Now we only forward the source data.
900        #[for_await]
901        for msg in input {
902            let msg = msg?;
903            match msg {
904                Message::Barrier(barrier) => {
905                    let mut split_changed = None;
906                    if let Some(ref mutation) = barrier.mutation.as_deref() {
907                        match mutation {
908                            Mutation::Pause | Mutation::Resume => {
909                                // We don't need to do anything. Handled by upstream.
910                            }
911                            Mutation::SourceChangeSplit(actor_splits) => {
912                                tracing::info!(
913                                    actor_splits = ?actor_splits,
914                                    "source change split received"
915                                );
916                                split_changed = actor_splits
917                                    .get(&self.actor_ctx.id)
918                                    .cloned()
919                                    .map(|target_splits| (target_splits, &mut states, true));
920                            }
921                            Mutation::Update(UpdateMutation { actor_splits, .. }) => {
922                                split_changed = actor_splits
923                                    .get(&self.actor_ctx.id)
924                                    .cloned()
925                                    .map(|target_splits| (target_splits, &mut states, false));
926                            }
927                            _ => {}
928                        }
929                    }
930                    self.backfill_state_store.commit(barrier.epoch).await?;
931                    let barrier_epoch = barrier.epoch;
932                    yield Message::Barrier(barrier);
933
934                    if let Some((target_splits, state, should_trim_state)) = split_changed {
935                        self.apply_split_change_forward_stage_after_yield_barrier(
936                            barrier_epoch,
937                            target_splits,
938                            state,
939                            should_trim_state,
940                        )
941                        .await?;
942                    }
943                }
944                Message::Chunk(chunk) => {
945                    yield Message::Chunk(chunk);
946                }
947                Message::Watermark(watermark) => {
948                    yield Message::Watermark(watermark);
949                }
950            }
951        }
952    }
953
954    /// When we should call `progress.finish()` to let blocking DDL return.
955    /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
956    ///
957    /// Note: split migration (online scaling) is related with progress tracking.
958    /// - For foreground DDL, scaling is not allowed before progress is finished.
959    /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
960    ///
961    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
962    fn should_report_finished(&self, states: &BackfillStates) -> bool {
963        states.values().all(|state| {
964            matches!(
965                state.state,
966                BackfillState::Finished | BackfillState::SourceCachingUp(_)
967            )
968        })
969    }
970
971    /// All splits entered `Finished` state.
972    ///
973    /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
974    /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
975    /// this actor, we still need to backfill it.
976    ///
977    /// Note: at the beginning, the actor will only read the state written by itself.
978    /// It needs to _wait until it can read all actors' written data_.
979    /// i.e., wait for the second checkpoint has been available.
980    ///
981    /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
982    async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
983        Ok(states
984            .values()
985            .all(|state| matches!(state.state, BackfillState::Finished))
986            && self
987                .backfill_state_store
988                .scan_may_stale()
989                .await?
990                .into_iter()
991                .all(|state| matches!(state.state, BackfillState::Finished)))
992    }
993
994    /// For newly added splits, we do not need to backfill and can directly forward from upstream.
995    async fn apply_split_change_after_yield_barrier(
996        &mut self,
997        barrier_epoch: EpochPair,
998        stage: &mut BackfillStage,
999        to_apply_mutation: ApplyMutationAfterBarrier,
1000    ) -> StreamExecutorResult<bool> {
1001        match to_apply_mutation {
1002            ApplyMutationAfterBarrier::SourceChangeSplit {
1003                target_splits,
1004                should_trim_state,
1005            } => {
1006                self.source_split_change_count.inc();
1007                {
1008                    if self
1009                        .update_state_if_changed(
1010                            barrier_epoch,
1011                            target_splits,
1012                            stage,
1013                            should_trim_state,
1014                        )
1015                        .await?
1016                    {
1017                        // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
1018                        Ok(true)
1019                    } else {
1020                        Ok(false)
1021                    }
1022                }
1023            }
1024            ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
1025        }
1026    }
1027
1028    /// Returns `true` if split changed. Otherwise `false`.
1029    async fn update_state_if_changed(
1030        &mut self,
1031        barrier_epoch: EpochPair,
1032        target_splits: Vec<SplitImpl>,
1033        stage: &mut BackfillStage,
1034        should_trim_state: bool,
1035    ) -> StreamExecutorResult<bool> {
1036        let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
1037
1038        let mut split_changed = false;
1039        // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
1040        // Will be set to target_state in the end.
1041        let old_states = std::mem::take(&mut stage.states);
1042        let committed_reader = self
1043            .backfill_state_store
1044            .new_committed_reader(barrier_epoch)
1045            .await?;
1046        // Iterate over the target (assigned) splits
1047        // - check if any new splits are added
1048        // - build target_state
1049        for split in &target_splits {
1050            let split_id = split.id();
1051            if let Some(s) = old_states.get(&split_id) {
1052                target_state.insert(split_id, s.clone());
1053            } else {
1054                split_changed = true;
1055
1056                let backfill_state = committed_reader
1057                    .try_recover_from_state_store(&split_id)
1058                    .await?;
1059                match backfill_state {
1060                    None => {
1061                        // Newly added split. We don't need to backfill.
1062                        // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
1063                        target_state.insert(
1064                            split_id,
1065                            BackfillStateWithProgress {
1066                                state: BackfillState::Finished,
1067                                num_consumed_rows: 0,
1068                                target_offset: None,
1069                            },
1070                        );
1071                    }
1072                    Some(backfill_state) => {
1073                        // Migrated split. Backfill if unfinished.
1074                        target_state.insert(split_id, backfill_state);
1075                    }
1076                }
1077            }
1078        }
1079
1080        // Checks dropped splits
1081        for existing_split_id in old_states.keys() {
1082            if !target_state.contains_key(existing_split_id) {
1083                tracing::info!("split dropping detected: {}", existing_split_id);
1084                split_changed = true;
1085            }
1086        }
1087
1088        if split_changed {
1089            let dropped_splits = stage
1090                .states
1091                .extract_if(|split_id, _| !target_state.contains_key(split_id))
1092                .map(|(split_id, _)| split_id);
1093
1094            if should_trim_state {
1095                // trim dropped splits' state
1096                self.backfill_state_store.trim_state(dropped_splits).await?;
1097            }
1098            tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1099        } else {
1100            debug_assert_eq!(old_states, target_state);
1101        }
1102        stage.states = target_state;
1103        stage.splits = target_splits;
1104        stage.debug_assert_consistent();
1105        Ok(split_changed)
1106    }
1107
1108    /// For split change during forward stage, all newly added splits should be already finished.
1109    // We just need to update the state store if necessary.
1110    async fn apply_split_change_forward_stage_after_yield_barrier(
1111        &mut self,
1112        barrier_epoch: EpochPair,
1113        target_splits: Vec<SplitImpl>,
1114        states: &mut BackfillStates,
1115        should_trim_state: bool,
1116    ) -> StreamExecutorResult<()> {
1117        self.source_split_change_count.inc();
1118        {
1119            self.update_state_if_changed_forward_stage(
1120                barrier_epoch,
1121                target_splits,
1122                states,
1123                should_trim_state,
1124            )
1125            .await?;
1126        }
1127
1128        Ok(())
1129    }
1130
1131    async fn update_state_if_changed_forward_stage(
1132        &mut self,
1133        barrier_epoch: EpochPair,
1134        target_splits: Vec<SplitImpl>,
1135        states: &mut BackfillStates,
1136        should_trim_state: bool,
1137    ) -> StreamExecutorResult<()> {
1138        let target_splits: HashSet<SplitId> =
1139            target_splits.into_iter().map(|split| split.id()).collect();
1140
1141        let mut split_changed = false;
1142        let mut newly_added_splits = vec![];
1143
1144        let committed_reader = self
1145            .backfill_state_store
1146            .new_committed_reader(barrier_epoch)
1147            .await?;
1148
1149        // Checks added splits
1150        for split_id in &target_splits {
1151            if !states.contains_key(split_id) {
1152                split_changed = true;
1153
1154                let backfill_state = committed_reader
1155                    .try_recover_from_state_store(split_id)
1156                    .await?;
1157                match &backfill_state {
1158                    None => {
1159                        // Newly added split. We don't need to backfill!
1160                        newly_added_splits.push(split_id.clone());
1161                    }
1162                    Some(backfill_state) => {
1163                        // Migrated split. It should also be finished since we are in forwarding stage.
1164                        match backfill_state.state {
1165                            BackfillState::Finished => {}
1166                            _ => {
1167                                return Err(anyhow::anyhow!(
1168                                    "Unexpected backfill state: {:?}",
1169                                    backfill_state
1170                                )
1171                                .into());
1172                            }
1173                        }
1174                    }
1175                }
1176                states.insert(
1177                    split_id.clone(),
1178                    backfill_state.unwrap_or(BackfillStateWithProgress {
1179                        state: BackfillState::Finished,
1180                        num_consumed_rows: 0,
1181                        target_offset: None,
1182                    }),
1183                );
1184            }
1185        }
1186
1187        // Checks dropped splits
1188        for existing_split_id in states.keys() {
1189            if !target_splits.contains(existing_split_id) {
1190                tracing::info!("split dropping detected: {}", existing_split_id);
1191                split_changed = true;
1192            }
1193        }
1194
1195        if split_changed {
1196            tracing::info!(
1197                target_splits = ?target_splits,
1198                "apply split change"
1199            );
1200
1201            let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1202
1203            if should_trim_state {
1204                // trim dropped splits' state
1205                self.backfill_state_store
1206                    .trim_state(dropped_splits.map(|(k, _v)| k))
1207                    .await?;
1208            }
1209
1210            // For migrated splits, and existing splits, we do not need to update
1211            // state store, but only for newly added splits.
1212            self.backfill_state_store
1213                .set_states(
1214                    newly_added_splits
1215                        .into_iter()
1216                        .map(|split_id| {
1217                            (
1218                                split_id,
1219                                BackfillStateWithProgress {
1220                                    state: BackfillState::Finished,
1221                                    num_consumed_rows: 0,
1222                                    target_offset: None,
1223                                },
1224                            )
1225                        })
1226                        .collect(),
1227                )
1228                .await?;
1229        }
1230
1231        Ok(())
1232    }
1233}
1234
1235fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1236    let a = a.parse::<i64>().unwrap();
1237    let b = b.parse::<i64>().unwrap();
1238    a.cmp(&b)
1239}
1240
1241impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1242    fn execute(self: Box<Self>) -> BoxedMessageStream {
1243        self.inner.execute(self.input).boxed()
1244    }
1245}
1246
1247impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1248    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1249        f.debug_struct("SourceBackfillExecutor")
1250            .field("source_id", &self.source_id)
1251            .field("column_ids", &self.column_ids)
1252            .field("stream_key", &self.info.stream_key)
1253            .finish()
1254    }
1255}
1256
1257struct PauseControl {
1258    // Paused due to backfill order control
1259    backfill_paused: bool,
1260    // Paused due to self-pause, e.g. let barrier catch up
1261    self_paused: bool,
1262    // Paused due to Pause command from meta, pause_on_next_bootstrap
1263    command_paused: bool,
1264    // reader paused
1265    reader_paused: bool,
1266}
1267
1268impl PauseControl {
1269    fn new() -> Self {
1270        Self {
1271            backfill_paused: false,
1272            self_paused: false,
1273            command_paused: false,
1274            reader_paused: false,
1275        }
1276    }
1277
1278    fn is_paused(&self) -> bool {
1279        self.backfill_paused || self.command_paused || self.self_paused
1280    }
1281
1282    /// returns whether we need to pause the reader.
1283    fn backfill_pause(&mut self) {
1284        if self.backfill_paused {
1285            tracing::warn!("backfill_pause invoked twice");
1286        }
1287        self.backfill_paused = true;
1288    }
1289
1290    /// returns whether we need to resume the reader.
1291    /// same precedence as command.
1292    fn backfill_resume(&mut self) -> bool {
1293        if !self.backfill_paused {
1294            tracing::warn!("backfill_resume invoked twice");
1295        }
1296        !self.command_paused
1297    }
1298
1299    /// returns whether we need to pause the reader.
1300    fn self_pause(&mut self) {
1301        assert!(
1302            !self.backfill_paused,
1303            "backfill stream should not be read when backfill_pause is set"
1304        );
1305        assert!(
1306            !self.command_paused,
1307            "backfill stream should not be read when command_pause is set"
1308        );
1309        if self.self_paused {
1310            tracing::warn!("self_pause invoked twice");
1311        }
1312        self.self_paused = true;
1313    }
1314
1315    /// returns whether we need to resume the reader.
1316    /// `self_resume` has the lowest precedence,
1317    /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1318    fn self_resume(&mut self) -> bool {
1319        self.self_paused = false;
1320        !(self.backfill_paused || self.command_paused)
1321    }
1322
1323    /// returns whether we need to pause the reader.
1324    fn command_pause(&mut self) {
1325        if self.command_paused {
1326            tracing::warn!("command_pause invoked twice");
1327        }
1328        self.command_paused = true;
1329    }
1330
1331    /// returns whether we need to resume the reader.
1332    /// same precedence as backfill.
1333    fn command_resume(&mut self) -> bool {
1334        if !self.command_paused {
1335            tracing::warn!("command_resume invoked twice");
1336        }
1337        self.command_paused = false;
1338        !self.backfill_paused
1339    }
1340}