risingwave_stream/executor/source/source_backfill_executor.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::ColumnId;
25use risingwave_common::id::SourceId;
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
27use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::JsonbVal;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::{
33 BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
34 SplitMetaData,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_storage::store::TryWaitEpochOptions;
38use serde::{Deserialize, Serialize};
39use thiserror_ext::AsReport;
40
41use super::executor_core::StreamSourceCore;
42use super::source_backfill_state_table::BackfillStateTableHandler;
43use super::{apply_rate_limit, get_split_offset_col_idx};
44use crate::common::rate_limit::limited_chunk_size;
45use crate::executor::UpdateMutation;
46use crate::executor::prelude::*;
47use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
48use crate::task::CreateMviewProgressReporter;
49
50#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
51pub enum BackfillState {
52 /// `None` means not started yet. It's the initial state.
53 /// XXX: perhaps we can also set to low-watermark instead of `None`
54 Backfilling(Option<String>),
55 /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
56 SourceCachingUp(String),
57 Finished,
58}
59pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
60
61/// Only `state` field is the real state for fail-over.
62/// Other fields are for observability (but we still need to persist them).
63#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
64pub struct BackfillStateWithProgress {
65 pub state: BackfillState,
66 pub num_consumed_rows: u64,
67 /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
68 /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
69 /// so that we can finish backfilling even when upstream doesn't emit any data.
70 pub target_offset: Option<String>,
71}
72
73impl BackfillStateWithProgress {
74 pub fn encode_to_json(self) -> JsonbVal {
75 serde_json::to_value(self).unwrap().into()
76 }
77
78 pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
79 serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
80 }
81}
82
83pub struct SourceBackfillExecutor<S: StateStore> {
84 pub inner: SourceBackfillExecutorInner<S>,
85 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
86 pub input: Executor,
87}
88
89pub struct SourceBackfillExecutorInner<S: StateStore> {
90 actor_ctx: ActorContextRef,
91 info: ExecutorInfo,
92
93 /// Streaming source for external
94 source_id: SourceId,
95 source_name: String,
96 column_ids: Vec<ColumnId>,
97 source_desc_builder: Option<SourceDescBuilder>,
98 backfill_state_store: BackfillStateTableHandler<S>,
99
100 /// Metrics for monitor.
101 metrics: Arc<StreamingMetrics>,
102 source_split_change_count: LabelGuardedIntCounter,
103
104 // /// Receiver of barrier channel.
105 // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
106 /// System parameter reader to read barrier interval
107 system_params: SystemParamsReaderRef,
108
109 /// Rate limit in rows/s.
110 rate_limit_rps: Option<u32>,
111
112 progress: CreateMviewProgressReporter,
113}
114
115/// Local variables used in the backfill stage.
116///
117/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
118///
119/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
120#[derive(Debug)]
121struct BackfillStage {
122 states: BackfillStates,
123 /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
124 ///
125 /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
126 splits: Vec<SplitImpl>,
127}
128
129enum ApplyMutationAfterBarrier {
130 SourceChangeSplit {
131 target_splits: Vec<SplitImpl>,
132 should_trim_state: bool,
133 },
134 ConnectorPropsChange,
135}
136
137impl BackfillStage {
138 fn total_backfilled_rows(&self) -> u64 {
139 self.states.values().map(|s| s.num_consumed_rows).sum()
140 }
141
142 fn debug_assert_consistent(&self) {
143 if cfg!(debug_assertions) {
144 let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
145 assert_eq!(
146 self.states.keys().cloned().collect::<HashSet<_>>(),
147 all_splits
148 );
149 }
150 }
151
152 /// Get unfinished splits with latest offsets according to the backfill states.
153 fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
154 let mut unfinished_splits = Vec::new();
155 for split in &self.splits {
156 let state = &self.states.get(split.id().as_ref()).unwrap().state;
157 match state {
158 BackfillState::Backfilling(Some(offset)) => {
159 let mut updated_split = split.clone();
160 updated_split.update_in_place(offset.clone())?;
161 unfinished_splits.push(updated_split);
162 }
163 BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
164 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
165 }
166 }
167 Ok(unfinished_splits)
168 }
169
170 /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
171 fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
172 let mut vis = false;
173 let state = self.states.get_mut(split_id).unwrap();
174 let state_inner = &mut state.state;
175 match state_inner {
176 BackfillState::Backfilling(None) => {
177 // backfilling for this split is not started yet. Ignore this row
178 }
179 BackfillState::Backfilling(Some(backfill_offset)) => {
180 match compare_kafka_offset(backfill_offset, offset) {
181 Ordering::Less => {
182 // continue backfilling. Ignore this row
183 }
184 Ordering::Equal => {
185 // backfilling for this split is finished just right.
186 *state_inner = BackfillState::Finished;
187 }
188 Ordering::Greater => {
189 // backfilling for this split produced more data than current source's progress.
190 // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
191 *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
192 }
193 }
194 }
195 BackfillState::SourceCachingUp(backfill_offset) => {
196 match compare_kafka_offset(backfill_offset, offset) {
197 Ordering::Less => {
198 // Source caught up, but doesn't contain the last backfilled row.
199 // This may happen e.g., if Kafka performed compaction.
200 vis = true;
201 *state_inner = BackfillState::Finished;
202 }
203 Ordering::Equal => {
204 // Source just caught up with backfilling.
205 *state_inner = BackfillState::Finished;
206 }
207 Ordering::Greater => {
208 // Source is still behind backfilling.
209 }
210 }
211 }
212 BackfillState::Finished => {
213 vis = true;
214 // This split's backfilling is finished, we are waiting for other splits
215 }
216 }
217 if matches!(state_inner, BackfillState::Backfilling(_)) {
218 state.target_offset = Some(offset.to_owned());
219 }
220 if vis {
221 debug_assert_eq!(*state_inner, BackfillState::Finished);
222 }
223 vis
224 }
225
226 /// Updates backfill states and returns whether the row backfilled from external system is visible.
227 fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
228 let state = self.states.get_mut(split_id).unwrap();
229 state.num_consumed_rows += 1;
230 let state_inner = &mut state.state;
231 match state_inner {
232 BackfillState::Backfilling(_old_offset) => {
233 if let Some(target_offset) = &state.target_offset
234 && compare_kafka_offset(offset, target_offset).is_ge()
235 {
236 // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
237 // and dropping duplicated messages.
238 // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
239 // In this case, upstream hasn't reached the target_offset yet.
240 //
241 // Note2: after this, all following rows in the current chunk will be invisible.
242 //
243 // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
244 // keep backfilling.
245 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
246 } else {
247 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
248 }
249 true
250 }
251 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
252 // backfilling stopped. ignore
253 false
254 }
255 }
256 }
257}
258
259impl<S: StateStore> SourceBackfillExecutorInner<S> {
260 #[expect(clippy::too_many_arguments)]
261 pub fn new(
262 actor_ctx: ActorContextRef,
263 info: ExecutorInfo,
264 stream_source_core: StreamSourceCore<S>,
265 metrics: Arc<StreamingMetrics>,
266 system_params: SystemParamsReaderRef,
267 backfill_state_store: BackfillStateTableHandler<S>,
268 rate_limit_rps: Option<u32>,
269 progress: CreateMviewProgressReporter,
270 ) -> Self {
271 let source_split_change_count = metrics
272 .source_split_change_count
273 .with_guarded_label_values(&[
274 &stream_source_core.source_id.to_string(),
275 &stream_source_core.source_name,
276 &actor_ctx.id.to_string(),
277 &actor_ctx.fragment_id.to_string(),
278 ]);
279
280 Self {
281 actor_ctx,
282 info,
283 source_id: stream_source_core.source_id,
284 source_name: stream_source_core.source_name,
285 column_ids: stream_source_core.column_ids,
286 source_desc_builder: stream_source_core.source_desc_builder,
287 backfill_state_store,
288 metrics,
289 source_split_change_count,
290 system_params,
291 rate_limit_rps,
292 progress,
293 }
294 }
295
296 async fn build_stream_source_reader(
297 &self,
298 source_desc: &SourceDesc,
299 splits: Vec<SplitImpl>,
300 ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
301 let column_ids = source_desc
302 .columns
303 .iter()
304 .map(|column_desc| column_desc.column_id)
305 .collect_vec();
306 let source_ctx = SourceContext::new(
307 self.actor_ctx.id,
308 self.source_id,
309 self.actor_ctx.fragment_id,
310 self.source_name.clone(),
311 source_desc.metrics.clone(),
312 SourceCtrlOpts {
313 chunk_size: limited_chunk_size(self.rate_limit_rps),
314 split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
315 },
316 source_desc.source.config.clone(),
317 None,
318 );
319
320 // We will check watermark to decide whether we need to backfill.
321 // e.g., when there's a Kafka topic-partition without any data,
322 // we don't need to backfill at all. But if we do not check here,
323 // the executor can only know it's finished when data coming in.
324 // For blocking DDL, this would be annoying.
325
326 let (stream, res) = source_desc
327 .source
328 .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
329 .await
330 .map_err(StreamExecutorError::connector_error)?;
331 Ok((
332 apply_rate_limit(stream, self.rate_limit_rps).boxed(),
333 res.backfill_info,
334 ))
335 }
336
337 #[try_stream(ok = Message, error = StreamExecutorError)]
338 async fn execute(mut self, input: Executor) {
339 let mut input = input.execute();
340
341 // Poll the upstream to get the first barrier.
342 let barrier = expect_first_barrier(&mut input).await?;
343 let first_epoch = barrier.epoch;
344 let owned_splits = barrier
345 .initial_split_assignment(self.actor_ctx.id)
346 .unwrap_or(&[])
347 .to_vec();
348
349 let mut pause_control = PauseControl::new();
350 if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
351 pause_control.backfill_pause();
352 }
353 if barrier.is_pause_on_startup() {
354 pause_control.command_pause();
355 }
356 yield Message::Barrier(barrier);
357
358 let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
359 let mut source_desc = source_desc_builder
360 .build()
361 .map_err(StreamExecutorError::connector_error)?;
362
363 // source backfill only applies to kafka, so we don't need to get pulsar's `message_id_data_idx`.
364 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
365 else {
366 unreachable!("Partition and offset columns must be set.");
367 };
368
369 self.backfill_state_store.init_epoch(first_epoch).await?;
370
371 let mut backfill_states: BackfillStates = HashMap::new();
372 {
373 let committed_reader = self
374 .backfill_state_store
375 .new_committed_reader(first_epoch)
376 .await?;
377 for split in &owned_splits {
378 let split_id = split.id();
379 let backfill_state = committed_reader
380 .try_recover_from_state_store(&split_id)
381 .await?
382 .unwrap_or(BackfillStateWithProgress {
383 state: BackfillState::Backfilling(None),
384 num_consumed_rows: 0,
385 target_offset: None, // init with None
386 });
387 backfill_states.insert(split_id, backfill_state);
388 }
389 }
390 let mut backfill_stage = BackfillStage {
391 states: backfill_states,
392 splits: owned_splits,
393 };
394 backfill_stage.debug_assert_consistent();
395
396 let (source_chunk_reader, backfill_info) = self
397 .build_stream_source_reader(
398 &source_desc,
399 backfill_stage.get_latest_unfinished_splits()?,
400 )
401 .instrument_await("source_build_reader")
402 .await?;
403 for (split_id, info) in &backfill_info {
404 let state = backfill_stage.states.get_mut(split_id).unwrap();
405 match info {
406 BackfillInfo::NoDataToBackfill => {
407 state.state = BackfillState::Finished;
408 }
409 BackfillInfo::HasDataToBackfill { latest_offset } => {
410 // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
411 state.target_offset = Some(latest_offset.clone());
412 }
413 }
414 }
415 tracing::debug!(?backfill_stage, "source backfill started");
416
417 fn select_strategy(_: &mut ()) -> PollNext {
418 futures::stream::PollNext::Left
419 }
420
421 // We choose "preferring upstream" strategy here, because:
422 // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
423 // For chunks from upstream, they are simply dropped, so there's no much overhead.
424 // So possibly this can also affect other running jobs less.
425 // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
426 let mut backfill_stream = select_with_strategy(
427 input.by_ref().map(Either::Left),
428 source_chunk_reader.map(Either::Right),
429 select_strategy,
430 );
431
432 type PausedReader = Option<impl Stream>;
433 let mut paused_reader: PausedReader = None;
434
435 macro_rules! pause_reader {
436 () => {
437 if !pause_control.reader_paused {
438 let (left, right) = backfill_stream.into_inner();
439 backfill_stream = select_with_strategy(
440 left,
441 futures::stream::pending().boxed().map(Either::Right),
442 select_strategy,
443 );
444 // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
445 paused_reader = Some(right);
446 pause_control.reader_paused = true;
447 }
448 };
449 }
450
451 macro_rules! resume_reader {
452 () => {
453 if pause_control.reader_paused {
454 backfill_stream = select_with_strategy(
455 input.by_ref().map(Either::Left),
456 paused_reader
457 .take()
458 .expect("should have paused reader to resume"),
459 select_strategy,
460 );
461 pause_control.reader_paused = false;
462 }
463 };
464 }
465
466 if pause_control.is_paused() {
467 pause_reader!();
468 }
469
470 let state_store = self
471 .backfill_state_store
472 .state_store()
473 .state_store()
474 .clone();
475 let table_id = self.backfill_state_store.state_store().table_id();
476 let mut state_table_initialized = false;
477 {
478 let source_backfill_row_count = self
479 .metrics
480 .source_backfill_row_count
481 .with_guarded_label_values(&[
482 &self.source_id.to_string(),
483 &self.source_name,
484 &self.actor_ctx.id.to_string(),
485 &self.actor_ctx.fragment_id.to_string(),
486 ]);
487
488 // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
489 // milliseconds, considering some other latencies like network and cost in Meta.
490 let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
491 as u128
492 * WAIT_BARRIER_MULTIPLE_TIMES;
493 let mut last_barrier_time = Instant::now();
494
495 // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
496 'backfill_loop: while let Some(either) = backfill_stream.next().await {
497 match either {
498 // Upstream
499 Either::Left(msg) => {
500 let Ok(msg) = msg else {
501 let e = msg.unwrap_err();
502 tracing::error!(
503 error = ?e.as_report(),
504 source_id = %self.source_id,
505 "stream source reader error",
506 );
507 GLOBAL_ERROR_METRICS.user_source_error.report([
508 "SourceReaderError".to_owned(),
509 self.source_id.to_string(),
510 self.source_name.clone(),
511 self.actor_ctx.fragment_id.to_string(),
512 ]);
513
514 let (reader, _backfill_info) = self
515 .build_stream_source_reader(
516 &source_desc,
517 backfill_stage.get_latest_unfinished_splits()?,
518 )
519 .await?;
520
521 backfill_stream = select_with_strategy(
522 input.by_ref().map(Either::Left),
523 reader.map(Either::Right),
524 select_strategy,
525 );
526 continue;
527 };
528 match msg {
529 Message::Barrier(barrier) => {
530 last_barrier_time = Instant::now();
531
532 if pause_control.self_resume() {
533 resume_reader!();
534 }
535
536 let mut maybe_muatation = None;
537 if let Some(ref mutation) = barrier.mutation.as_deref() {
538 match mutation {
539 Mutation::Pause => {
540 // pause_reader should not be invoked consecutively more than once.
541 pause_control.command_pause();
542 pause_reader!();
543 }
544 Mutation::Resume => {
545 // pause_reader.take should not be invoked consecutively more than once.
546 if pause_control.command_resume() {
547 resume_reader!();
548 }
549 }
550 Mutation::StartFragmentBackfill { fragment_ids } => {
551 if fragment_ids.contains(&self.actor_ctx.fragment_id)
552 && pause_control.backfill_resume()
553 {
554 resume_reader!();
555 }
556 }
557 Mutation::SourceChangeSplit(actor_splits) => {
558 tracing::info!(
559 actor_splits = ?actor_splits,
560 "source change split received"
561 );
562 maybe_muatation = actor_splits
563 .get(&self.actor_ctx.id)
564 .cloned()
565 .map(|target_splits| {
566 ApplyMutationAfterBarrier::SourceChangeSplit {
567 target_splits,
568 should_trim_state: true,
569 }
570 });
571 }
572 Mutation::Update(UpdateMutation {
573 actor_splits, ..
574 }) => {
575 maybe_muatation = actor_splits
576 .get(&self.actor_ctx.id)
577 .cloned()
578 .map(|target_splits| {
579 ApplyMutationAfterBarrier::SourceChangeSplit {
580 target_splits,
581 should_trim_state: false,
582 }
583 });
584 }
585 Mutation::ConnectorPropsChange(maybe_mutation) => {
586 if let Some(props_plaintext) =
587 maybe_mutation.get(&self.source_id.as_raw_id())
588 {
589 source_desc
590 .update_reader(props_plaintext.clone())?;
591
592 maybe_muatation = Some(
593 ApplyMutationAfterBarrier::ConnectorPropsChange,
594 );
595 }
596 }
597 Mutation::Throttle(actor_to_apply) => {
598 if let Some(new_rate_limit) =
599 actor_to_apply.get(&self.actor_ctx.id)
600 && *new_rate_limit != self.rate_limit_rps
601 {
602 tracing::info!(
603 "updating rate limit from {:?} to {:?}",
604 self.rate_limit_rps,
605 *new_rate_limit
606 );
607 self.rate_limit_rps = *new_rate_limit;
608 // rebuild reader
609 let (reader, _backfill_info) = self
610 .build_stream_source_reader(
611 &source_desc,
612 backfill_stage
613 .get_latest_unfinished_splits()?,
614 )
615 .await?;
616
617 backfill_stream = select_with_strategy(
618 input.by_ref().map(Either::Left),
619 reader.map(Either::Right),
620 select_strategy,
621 );
622 }
623 }
624 _ => {}
625 }
626 }
627 async fn rebuild_reader_on_split_changed(
628 this: &SourceBackfillExecutorInner<impl StateStore>,
629 backfill_stage: &BackfillStage,
630 source_desc: &SourceDesc,
631 ) -> StreamExecutorResult<BoxSourceChunkStream>
632 {
633 // rebuild backfill_stream
634 // Note: we don't put this part in a method, due to some complex lifetime issues.
635
636 let latest_unfinished_splits =
637 backfill_stage.get_latest_unfinished_splits()?;
638 tracing::info!(
639 "actor {:?} apply source split change to {:?}",
640 this.actor_ctx.id,
641 latest_unfinished_splits
642 );
643
644 // Replace the source reader with a new one of the new state.
645 let (reader, _backfill_info) = this
646 .build_stream_source_reader(
647 source_desc,
648 latest_unfinished_splits,
649 )
650 .await?;
651
652 Ok(reader)
653 }
654
655 self.backfill_state_store
656 .set_states(backfill_stage.states.clone())
657 .await?;
658 self.backfill_state_store.commit(barrier.epoch).await?;
659
660 if self.should_report_finished(&backfill_stage.states) {
661 // drop the backfill kafka consumers
662 backfill_stream = select_with_strategy(
663 input.by_ref().map(Either::Left),
664 futures::stream::pending().boxed().map(Either::Right),
665 select_strategy,
666 );
667
668 self.progress.finish(
669 barrier.epoch,
670 backfill_stage.total_backfilled_rows(),
671 );
672
673 let barrier_epoch = barrier.epoch;
674 let is_checkpoint = barrier.is_checkpoint();
675 // yield barrier after reporting progress
676 yield Message::Barrier(barrier);
677
678 if let Some(to_apply_mutation) = maybe_muatation {
679 self.apply_split_change_after_yield_barrier(
680 barrier_epoch,
681 &mut backfill_stage,
682 to_apply_mutation,
683 )
684 .await?;
685 }
686
687 if !state_table_initialized {
688 if is_checkpoint {
689 // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
690 // We wait for 2nd epoch
691 let epoch = barrier_epoch.prev;
692 tracing::info!("waiting for epoch: {}", epoch);
693 state_store
694 .try_wait_epoch(
695 HummockReadEpoch::Committed(epoch),
696 TryWaitEpochOptions { table_id },
697 )
698 .await?;
699 tracing::info!("finished waiting for epoch: {}", epoch);
700 state_table_initialized = true;
701 }
702 } else {
703 // After we reported finished, we still don't exit the loop.
704 // Because we need to handle split migration.
705 assert!(
706 state_table_initialized,
707 "state table should be initialized before checking backfill finished"
708 );
709 if self.backfill_finished(&backfill_stage.states).await? {
710 tracing::info!("source backfill finished");
711 break 'backfill_loop;
712 }
713 }
714 } else {
715 self.progress.update_for_source_backfill(
716 barrier.epoch,
717 backfill_stage.total_backfilled_rows(),
718 );
719
720 let barrier_epoch = barrier.epoch;
721 // yield barrier after reporting progress
722 yield Message::Barrier(barrier);
723
724 if let Some(to_apply_mutation) = maybe_muatation
725 && self
726 .apply_split_change_after_yield_barrier(
727 barrier_epoch,
728 &mut backfill_stage,
729 to_apply_mutation,
730 )
731 .await?
732 {
733 let reader = rebuild_reader_on_split_changed(
734 &self,
735 &backfill_stage,
736 &source_desc,
737 )
738 .await?;
739
740 backfill_stream = select_with_strategy(
741 input.by_ref().map(Either::Left),
742 reader.map(Either::Right),
743 select_strategy,
744 );
745 }
746 }
747 }
748 Message::Chunk(chunk) => {
749 // We need to iterate over all rows because there might be multiple splits in a chunk.
750 // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
751 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
752
753 for (i, (_, row)) in chunk.rows().enumerate() {
754 let split = row.datum_at(split_idx).unwrap().into_utf8();
755 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
756 let vis = backfill_stage.handle_upstream_row(split, offset);
757 new_vis.set(i, vis);
758 }
759 // emit chunk if vis is not empty. i.e., some splits finished backfilling.
760 let new_vis = new_vis.finish();
761 if new_vis.count_ones() != 0 {
762 let new_chunk = chunk.clone_with_vis(new_vis);
763 yield Message::Chunk(new_chunk);
764 }
765 }
766 Message::Watermark(_) => {
767 // Ignore watermark during backfill.
768 }
769 }
770 }
771 // backfill
772 Either::Right(msg) => {
773 let chunk = msg?;
774
775 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
776 // Pause to let barrier catch up via backpressure of snapshot stream.
777 pause_control.self_pause();
778 pause_reader!();
779
780 // Exceeds the max wait barrier time, the source will be paused.
781 // Currently we can guarantee the
782 // source is not paused since it received stream
783 // chunks.
784 tracing::warn!(
785 "source {} paused, wait barrier for {:?}",
786 self.info.identity,
787 last_barrier_time.elapsed()
788 );
789
790 // Only update `max_wait_barrier_time_ms` to capture
791 // `barrier_interval_ms`
792 // changes here to avoid frequently accessing the shared
793 // `system_params`.
794 max_wait_barrier_time_ms =
795 self.system_params.load().barrier_interval_ms() as u128
796 * WAIT_BARRIER_MULTIPLE_TIMES;
797 }
798 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
799
800 for (i, (_, row)) in chunk.rows().enumerate() {
801 let split_id = row.datum_at(split_idx).unwrap().into_utf8();
802 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
803 let vis = backfill_stage.handle_backfill_row(split_id, offset);
804 new_vis.set(i, vis);
805 }
806
807 let new_vis = new_vis.finish();
808 let card = new_vis.count_ones();
809 if card != 0 {
810 let new_chunk = chunk.clone_with_vis(new_vis);
811 yield Message::Chunk(new_chunk);
812 source_backfill_row_count.inc_by(card as u64);
813 }
814 }
815 }
816 }
817 }
818
819 std::mem::drop(backfill_stream);
820 let mut states = backfill_stage.states;
821 // Make sure `Finished` state is persisted.
822 self.backfill_state_store.set_states(states.clone()).await?;
823
824 // All splits finished backfilling. Now we only forward the source data.
825 #[for_await]
826 for msg in input {
827 let msg = msg?;
828 match msg {
829 Message::Barrier(barrier) => {
830 let mut split_changed = None;
831 if let Some(ref mutation) = barrier.mutation.as_deref() {
832 match mutation {
833 Mutation::Pause | Mutation::Resume => {
834 // We don't need to do anything. Handled by upstream.
835 }
836 Mutation::SourceChangeSplit(actor_splits) => {
837 tracing::info!(
838 actor_splits = ?actor_splits,
839 "source change split received"
840 );
841 split_changed = actor_splits
842 .get(&self.actor_ctx.id)
843 .cloned()
844 .map(|target_splits| (target_splits, &mut states, true));
845 }
846 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
847 split_changed = actor_splits
848 .get(&self.actor_ctx.id)
849 .cloned()
850 .map(|target_splits| (target_splits, &mut states, false));
851 }
852 _ => {}
853 }
854 }
855 self.backfill_state_store.commit(barrier.epoch).await?;
856 let barrier_epoch = barrier.epoch;
857 yield Message::Barrier(barrier);
858
859 if let Some((target_splits, state, should_trim_state)) = split_changed {
860 self.apply_split_change_forward_stage_after_yield_barrier(
861 barrier_epoch,
862 target_splits,
863 state,
864 should_trim_state,
865 )
866 .await?;
867 }
868 }
869 Message::Chunk(chunk) => {
870 yield Message::Chunk(chunk);
871 }
872 Message::Watermark(watermark) => {
873 yield Message::Watermark(watermark);
874 }
875 }
876 }
877 }
878
879 /// When we should call `progress.finish()` to let blocking DDL return.
880 /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
881 ///
882 /// Note: split migration (online scaling) is related with progress tracking.
883 /// - For foreground DDL, scaling is not allowed before progress is finished.
884 /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
885 ///
886 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
887 fn should_report_finished(&self, states: &BackfillStates) -> bool {
888 states.values().all(|state| {
889 matches!(
890 state.state,
891 BackfillState::Finished | BackfillState::SourceCachingUp(_)
892 )
893 })
894 }
895
896 /// All splits entered `Finished` state.
897 ///
898 /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
899 /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
900 /// this actor, we still need to backfill it.
901 ///
902 /// Note: at the beginning, the actor will only read the state written by itself.
903 /// It needs to _wait until it can read all actors' written data_.
904 /// i.e., wait for the second checkpoint has been available.
905 ///
906 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
907 async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
908 Ok(states
909 .values()
910 .all(|state| matches!(state.state, BackfillState::Finished))
911 && self
912 .backfill_state_store
913 .scan_may_stale()
914 .await?
915 .into_iter()
916 .all(|state| matches!(state.state, BackfillState::Finished)))
917 }
918
919 /// For newly added splits, we do not need to backfill and can directly forward from upstream.
920 async fn apply_split_change_after_yield_barrier(
921 &mut self,
922 barrier_epoch: EpochPair,
923 stage: &mut BackfillStage,
924 to_apply_mutation: ApplyMutationAfterBarrier,
925 ) -> StreamExecutorResult<bool> {
926 match to_apply_mutation {
927 ApplyMutationAfterBarrier::SourceChangeSplit {
928 target_splits,
929 should_trim_state,
930 } => {
931 self.source_split_change_count.inc();
932 {
933 if self
934 .update_state_if_changed(
935 barrier_epoch,
936 target_splits,
937 stage,
938 should_trim_state,
939 )
940 .await?
941 {
942 // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
943 Ok(true)
944 } else {
945 Ok(false)
946 }
947 }
948 }
949 ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
950 }
951 }
952
953 /// Returns `true` if split changed. Otherwise `false`.
954 async fn update_state_if_changed(
955 &mut self,
956 barrier_epoch: EpochPair,
957 target_splits: Vec<SplitImpl>,
958 stage: &mut BackfillStage,
959 should_trim_state: bool,
960 ) -> StreamExecutorResult<bool> {
961 let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
962
963 let mut split_changed = false;
964 // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
965 // Will be set to target_state in the end.
966 let old_states = std::mem::take(&mut stage.states);
967 let committed_reader = self
968 .backfill_state_store
969 .new_committed_reader(barrier_epoch)
970 .await?;
971 // Iterate over the target (assigned) splits
972 // - check if any new splits are added
973 // - build target_state
974 for split in &target_splits {
975 let split_id = split.id();
976 if let Some(s) = old_states.get(&split_id) {
977 target_state.insert(split_id, s.clone());
978 } else {
979 split_changed = true;
980
981 let backfill_state = committed_reader
982 .try_recover_from_state_store(&split_id)
983 .await?;
984 match backfill_state {
985 None => {
986 // Newly added split. We don't need to backfill.
987 // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
988 target_state.insert(
989 split_id,
990 BackfillStateWithProgress {
991 state: BackfillState::Finished,
992 num_consumed_rows: 0,
993 target_offset: None,
994 },
995 );
996 }
997 Some(backfill_state) => {
998 // Migrated split. Backfill if unfinished.
999 target_state.insert(split_id, backfill_state);
1000 }
1001 }
1002 }
1003 }
1004
1005 // Checks dropped splits
1006 for existing_split_id in old_states.keys() {
1007 if !target_state.contains_key(existing_split_id) {
1008 tracing::info!("split dropping detected: {}", existing_split_id);
1009 split_changed = true;
1010 }
1011 }
1012
1013 if split_changed {
1014 let dropped_splits = stage
1015 .states
1016 .extract_if(|split_id, _| !target_state.contains_key(split_id))
1017 .map(|(split_id, _)| split_id);
1018
1019 if should_trim_state {
1020 // trim dropped splits' state
1021 self.backfill_state_store.trim_state(dropped_splits).await?;
1022 }
1023 tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1024 } else {
1025 debug_assert_eq!(old_states, target_state);
1026 }
1027 stage.states = target_state;
1028 stage.splits = target_splits;
1029 stage.debug_assert_consistent();
1030 Ok(split_changed)
1031 }
1032
1033 /// For split change during forward stage, all newly added splits should be already finished.
1034 // We just need to update the state store if necessary.
1035 async fn apply_split_change_forward_stage_after_yield_barrier(
1036 &mut self,
1037 barrier_epoch: EpochPair,
1038 target_splits: Vec<SplitImpl>,
1039 states: &mut BackfillStates,
1040 should_trim_state: bool,
1041 ) -> StreamExecutorResult<()> {
1042 self.source_split_change_count.inc();
1043 {
1044 self.update_state_if_changed_forward_stage(
1045 barrier_epoch,
1046 target_splits,
1047 states,
1048 should_trim_state,
1049 )
1050 .await?;
1051 }
1052
1053 Ok(())
1054 }
1055
1056 async fn update_state_if_changed_forward_stage(
1057 &mut self,
1058 barrier_epoch: EpochPair,
1059 target_splits: Vec<SplitImpl>,
1060 states: &mut BackfillStates,
1061 should_trim_state: bool,
1062 ) -> StreamExecutorResult<()> {
1063 let target_splits: HashSet<SplitId> =
1064 target_splits.into_iter().map(|split| split.id()).collect();
1065
1066 let mut split_changed = false;
1067 let mut newly_added_splits = vec![];
1068
1069 let committed_reader = self
1070 .backfill_state_store
1071 .new_committed_reader(barrier_epoch)
1072 .await?;
1073
1074 // Checks added splits
1075 for split_id in &target_splits {
1076 if !states.contains_key(split_id) {
1077 split_changed = true;
1078
1079 let backfill_state = committed_reader
1080 .try_recover_from_state_store(split_id)
1081 .await?;
1082 match &backfill_state {
1083 None => {
1084 // Newly added split. We don't need to backfill!
1085 newly_added_splits.push(split_id.clone());
1086 }
1087 Some(backfill_state) => {
1088 // Migrated split. It should also be finished since we are in forwarding stage.
1089 match backfill_state.state {
1090 BackfillState::Finished => {}
1091 _ => {
1092 return Err(anyhow::anyhow!(
1093 "Unexpected backfill state: {:?}",
1094 backfill_state
1095 )
1096 .into());
1097 }
1098 }
1099 }
1100 }
1101 states.insert(
1102 split_id.clone(),
1103 backfill_state.unwrap_or(BackfillStateWithProgress {
1104 state: BackfillState::Finished,
1105 num_consumed_rows: 0,
1106 target_offset: None,
1107 }),
1108 );
1109 }
1110 }
1111
1112 // Checks dropped splits
1113 for existing_split_id in states.keys() {
1114 if !target_splits.contains(existing_split_id) {
1115 tracing::info!("split dropping detected: {}", existing_split_id);
1116 split_changed = true;
1117 }
1118 }
1119
1120 if split_changed {
1121 tracing::info!(
1122 target_splits = ?target_splits,
1123 "apply split change"
1124 );
1125
1126 let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1127
1128 if should_trim_state {
1129 // trim dropped splits' state
1130 self.backfill_state_store
1131 .trim_state(dropped_splits.map(|(k, _v)| k))
1132 .await?;
1133 }
1134
1135 // For migrated splits, and existing splits, we do not need to update
1136 // state store, but only for newly added splits.
1137 self.backfill_state_store
1138 .set_states(
1139 newly_added_splits
1140 .into_iter()
1141 .map(|split_id| {
1142 (
1143 split_id,
1144 BackfillStateWithProgress {
1145 state: BackfillState::Finished,
1146 num_consumed_rows: 0,
1147 target_offset: None,
1148 },
1149 )
1150 })
1151 .collect(),
1152 )
1153 .await?;
1154 }
1155
1156 Ok(())
1157 }
1158}
1159
1160fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1161 let a = a.parse::<i64>().unwrap();
1162 let b = b.parse::<i64>().unwrap();
1163 a.cmp(&b)
1164}
1165
1166impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1167 fn execute(self: Box<Self>) -> BoxedMessageStream {
1168 self.inner.execute(self.input).boxed()
1169 }
1170}
1171
1172impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1173 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1174 f.debug_struct("SourceBackfillExecutor")
1175 .field("source_id", &self.source_id)
1176 .field("column_ids", &self.column_ids)
1177 .field("stream_key", &self.info.stream_key)
1178 .finish()
1179 }
1180}
1181
1182struct PauseControl {
1183 // Paused due to backfill order control
1184 backfill_paused: bool,
1185 // Paused due to self-pause, e.g. let barrier catch up
1186 self_paused: bool,
1187 // Paused due to Pause command from meta, pause_on_next_bootstrap
1188 command_paused: bool,
1189 // reader paused
1190 reader_paused: bool,
1191}
1192
1193impl PauseControl {
1194 fn new() -> Self {
1195 Self {
1196 backfill_paused: false,
1197 self_paused: false,
1198 command_paused: false,
1199 reader_paused: false,
1200 }
1201 }
1202
1203 fn is_paused(&self) -> bool {
1204 self.backfill_paused || self.command_paused || self.self_paused
1205 }
1206
1207 /// returns whether we need to pause the reader.
1208 fn backfill_pause(&mut self) {
1209 if self.backfill_paused {
1210 tracing::warn!("backfill_pause invoked twice");
1211 }
1212 self.backfill_paused = true;
1213 }
1214
1215 /// returns whether we need to resume the reader.
1216 /// same precedence as command.
1217 fn backfill_resume(&mut self) -> bool {
1218 if !self.backfill_paused {
1219 tracing::warn!("backfill_resume invoked twice");
1220 }
1221 !self.command_paused
1222 }
1223
1224 /// returns whether we need to pause the reader.
1225 fn self_pause(&mut self) {
1226 assert!(
1227 !self.backfill_paused,
1228 "backfill stream should not be read when backfill_pause is set"
1229 );
1230 assert!(
1231 !self.command_paused,
1232 "backfill stream should not be read when command_pause is set"
1233 );
1234 if self.self_paused {
1235 tracing::warn!("self_pause invoked twice");
1236 }
1237 self.self_paused = true;
1238 }
1239
1240 /// returns whether we need to resume the reader.
1241 /// `self_resume` has the lowest precedence,
1242 /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1243 fn self_resume(&mut self) -> bool {
1244 self.self_paused = false;
1245 !(self.backfill_paused || self.command_paused)
1246 }
1247
1248 /// returns whether we need to pause the reader.
1249 fn command_pause(&mut self) {
1250 if self.command_paused {
1251 tracing::warn!("command_pause invoked twice");
1252 }
1253 self.command_paused = true;
1254 }
1255
1256 /// returns whether we need to resume the reader.
1257 /// same precedence as backfill.
1258 fn command_resume(&mut self) -> bool {
1259 if !self.command_paused {
1260 tracing::warn!("command_resume invoked twice");
1261 }
1262 self.command_paused = false;
1263 !self.backfill_paused
1264 }
1265}