risingwave_stream/executor/source/source_backfill_executor.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32 BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33 SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51 /// `None` means not started yet. It's the initial state.
52 /// XXX: perhaps we can also set to low-watermark instead of `None`
53 Backfilling(Option<String>),
54 /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55 SourceCachingUp(String),
56 Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64 pub state: BackfillState,
65 pub num_consumed_rows: u64,
66 /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67 /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68 /// so that we can finish backfilling even when upstream doesn't emit any data.
69 pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73 pub fn encode_to_json(self) -> JsonbVal {
74 serde_json::to_value(self).unwrap().into()
75 }
76
77 pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78 serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79 }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83 pub inner: SourceBackfillExecutorInner<S>,
84 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85 pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89 actor_ctx: ActorContextRef,
90 info: ExecutorInfo,
91
92 /// Streaming source for external
93 source_id: TableId,
94 source_name: String,
95 column_ids: Vec<ColumnId>,
96 source_desc_builder: Option<SourceDescBuilder>,
97 backfill_state_store: BackfillStateTableHandler<S>,
98
99 /// Metrics for monitor.
100 metrics: Arc<StreamingMetrics>,
101 source_split_change_count: LabelGuardedIntCounter,
102
103 // /// Receiver of barrier channel.
104 // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105 /// System parameter reader to read barrier interval
106 system_params: SystemParamsReaderRef,
107
108 /// Rate limit in rows/s.
109 rate_limit_rps: Option<u32>,
110
111 progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121 states: BackfillStates,
122 /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123 ///
124 /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125 splits: Vec<SplitImpl>,
126}
127
128enum ApplyMutationAfterBarrier {
129 SourceChangeSplit {
130 target_splits: Vec<SplitImpl>,
131 should_trim_state: bool,
132 },
133 ConnectorPropsChange,
134}
135
136impl BackfillStage {
137 fn total_backfilled_rows(&self) -> u64 {
138 self.states.values().map(|s| s.num_consumed_rows).sum()
139 }
140
141 fn debug_assert_consistent(&self) {
142 if cfg!(debug_assertions) {
143 let all_splits: HashSet<_> =
144 self.splits.iter().map(|split| split.id().clone()).collect();
145 assert_eq!(
146 self.states.keys().cloned().collect::<HashSet<_>>(),
147 all_splits
148 );
149 }
150 }
151
152 /// Get unfinished splits with latest offsets according to the backfill states.
153 fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
154 let mut unfinished_splits = Vec::new();
155 for split in &self.splits {
156 let state = &self.states.get(split.id().as_ref()).unwrap().state;
157 match state {
158 BackfillState::Backfilling(Some(offset)) => {
159 let mut updated_split = split.clone();
160 updated_split.update_in_place(offset.clone())?;
161 unfinished_splits.push(updated_split);
162 }
163 BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
164 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
165 }
166 }
167 Ok(unfinished_splits)
168 }
169
170 /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
171 fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
172 let mut vis = false;
173 let state = self.states.get_mut(split_id).unwrap();
174 let state_inner = &mut state.state;
175 match state_inner {
176 BackfillState::Backfilling(None) => {
177 // backfilling for this split is not started yet. Ignore this row
178 }
179 BackfillState::Backfilling(Some(backfill_offset)) => {
180 match compare_kafka_offset(backfill_offset, offset) {
181 Ordering::Less => {
182 // continue backfilling. Ignore this row
183 }
184 Ordering::Equal => {
185 // backfilling for this split is finished just right.
186 *state_inner = BackfillState::Finished;
187 }
188 Ordering::Greater => {
189 // backfilling for this split produced more data than current source's progress.
190 // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
191 *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
192 }
193 }
194 }
195 BackfillState::SourceCachingUp(backfill_offset) => {
196 match compare_kafka_offset(backfill_offset, offset) {
197 Ordering::Less => {
198 // Source caught up, but doesn't contain the last backfilled row.
199 // This may happen e.g., if Kafka performed compaction.
200 vis = true;
201 *state_inner = BackfillState::Finished;
202 }
203 Ordering::Equal => {
204 // Source just caught up with backfilling.
205 *state_inner = BackfillState::Finished;
206 }
207 Ordering::Greater => {
208 // Source is still behind backfilling.
209 }
210 }
211 }
212 BackfillState::Finished => {
213 vis = true;
214 // This split's backfilling is finished, we are waiting for other splits
215 }
216 }
217 if matches!(state_inner, BackfillState::Backfilling(_)) {
218 state.target_offset = Some(offset.to_owned());
219 }
220 if vis {
221 debug_assert_eq!(*state_inner, BackfillState::Finished);
222 }
223 vis
224 }
225
226 /// Updates backfill states and returns whether the row backfilled from external system is visible.
227 fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
228 let state = self.states.get_mut(split_id).unwrap();
229 state.num_consumed_rows += 1;
230 let state_inner = &mut state.state;
231 match state_inner {
232 BackfillState::Backfilling(_old_offset) => {
233 if let Some(target_offset) = &state.target_offset
234 && compare_kafka_offset(offset, target_offset).is_ge()
235 {
236 // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
237 // and dropping duplicated messages.
238 // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
239 // In this case, upstream hasn't reached the target_offset yet.
240 //
241 // Note2: after this, all following rows in the current chunk will be invisible.
242 //
243 // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
244 // keep backfilling.
245 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
246 } else {
247 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
248 }
249 true
250 }
251 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
252 // backfilling stopped. ignore
253 false
254 }
255 }
256 }
257}
258
259impl<S: StateStore> SourceBackfillExecutorInner<S> {
260 #[expect(clippy::too_many_arguments)]
261 pub fn new(
262 actor_ctx: ActorContextRef,
263 info: ExecutorInfo,
264 stream_source_core: StreamSourceCore<S>,
265 metrics: Arc<StreamingMetrics>,
266 system_params: SystemParamsReaderRef,
267 backfill_state_store: BackfillStateTableHandler<S>,
268 rate_limit_rps: Option<u32>,
269 progress: CreateMviewProgressReporter,
270 ) -> Self {
271 let source_split_change_count = metrics
272 .source_split_change_count
273 .with_guarded_label_values(&[
274 &stream_source_core.source_id.to_string(),
275 &stream_source_core.source_name,
276 &actor_ctx.id.to_string(),
277 &actor_ctx.fragment_id.to_string(),
278 ]);
279
280 Self {
281 actor_ctx,
282 info,
283 source_id: stream_source_core.source_id,
284 source_name: stream_source_core.source_name,
285 column_ids: stream_source_core.column_ids,
286 source_desc_builder: stream_source_core.source_desc_builder,
287 backfill_state_store,
288 metrics,
289 source_split_change_count,
290 system_params,
291 rate_limit_rps,
292 progress,
293 }
294 }
295
296 async fn build_stream_source_reader(
297 &self,
298 source_desc: &SourceDesc,
299 splits: Vec<SplitImpl>,
300 ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
301 let column_ids = source_desc
302 .columns
303 .iter()
304 .map(|column_desc| column_desc.column_id)
305 .collect_vec();
306 let source_ctx = SourceContext::new(
307 self.actor_ctx.id,
308 self.source_id,
309 self.actor_ctx.fragment_id,
310 self.source_name.clone(),
311 source_desc.metrics.clone(),
312 SourceCtrlOpts {
313 chunk_size: limited_chunk_size(self.rate_limit_rps),
314 split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
315 },
316 source_desc.source.config.clone(),
317 None,
318 );
319
320 // We will check watermark to decide whether we need to backfill.
321 // e.g., when there's a Kafka topic-partition without any data,
322 // we don't need to backfill at all. But if we do not check here,
323 // the executor can only know it's finished when data coming in.
324 // For blocking DDL, this would be annoying.
325
326 let (stream, res) = source_desc
327 .source
328 .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
329 .await
330 .map_err(StreamExecutorError::connector_error)?;
331 Ok((
332 apply_rate_limit(stream, self.rate_limit_rps).boxed(),
333 res.backfill_info,
334 ))
335 }
336
337 #[try_stream(ok = Message, error = StreamExecutorError)]
338 async fn execute(mut self, input: Executor) {
339 let mut input = input.execute();
340
341 // Poll the upstream to get the first barrier.
342 let barrier = expect_first_barrier(&mut input).await?;
343 let first_epoch = barrier.epoch;
344 let owned_splits = barrier
345 .initial_split_assignment(self.actor_ctx.id)
346 .unwrap_or(&[])
347 .to_vec();
348
349 let mut pause_control = PauseControl::new();
350 if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
351 pause_control.backfill_pause();
352 }
353 if barrier.is_pause_on_startup() {
354 pause_control.command_pause();
355 }
356 yield Message::Barrier(barrier);
357
358 let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
359 let mut source_desc = source_desc_builder
360 .build()
361 .map_err(StreamExecutorError::connector_error)?;
362 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
363 else {
364 unreachable!("Partition and offset columns must be set.");
365 };
366
367 self.backfill_state_store.init_epoch(first_epoch).await?;
368
369 let mut backfill_states: BackfillStates = HashMap::new();
370 {
371 let committed_reader = self
372 .backfill_state_store
373 .new_committed_reader(first_epoch)
374 .await?;
375 for split in &owned_splits {
376 let split_id = split.id();
377 let backfill_state = committed_reader
378 .try_recover_from_state_store(&split_id)
379 .await?
380 .unwrap_or(BackfillStateWithProgress {
381 state: BackfillState::Backfilling(None),
382 num_consumed_rows: 0,
383 target_offset: None, // init with None
384 });
385 backfill_states.insert(split_id, backfill_state);
386 }
387 }
388 let mut backfill_stage = BackfillStage {
389 states: backfill_states,
390 splits: owned_splits,
391 };
392 backfill_stage.debug_assert_consistent();
393
394 let (source_chunk_reader, backfill_info) = self
395 .build_stream_source_reader(
396 &source_desc,
397 backfill_stage.get_latest_unfinished_splits()?,
398 )
399 .instrument_await("source_build_reader")
400 .await?;
401 for (split_id, info) in &backfill_info {
402 let state = backfill_stage.states.get_mut(split_id).unwrap();
403 match info {
404 BackfillInfo::NoDataToBackfill => {
405 state.state = BackfillState::Finished;
406 }
407 BackfillInfo::HasDataToBackfill { latest_offset } => {
408 // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
409 state.target_offset = Some(latest_offset.clone());
410 }
411 }
412 }
413 tracing::debug!(?backfill_stage, "source backfill started");
414
415 fn select_strategy(_: &mut ()) -> PollNext {
416 futures::stream::PollNext::Left
417 }
418
419 // We choose "preferring upstream" strategy here, because:
420 // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
421 // For chunks from upstream, they are simply dropped, so there's no much overhead.
422 // So possibly this can also affect other running jobs less.
423 // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
424 let mut backfill_stream = select_with_strategy(
425 input.by_ref().map(Either::Left),
426 source_chunk_reader.map(Either::Right),
427 select_strategy,
428 );
429
430 type PausedReader = Option<impl Stream>;
431 let mut paused_reader: PausedReader = None;
432
433 macro_rules! pause_reader {
434 () => {
435 if !pause_control.reader_paused {
436 let (left, right) = backfill_stream.into_inner();
437 backfill_stream = select_with_strategy(
438 left,
439 futures::stream::pending().boxed().map(Either::Right),
440 select_strategy,
441 );
442 // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
443 paused_reader = Some(right);
444 pause_control.reader_paused = true;
445 }
446 };
447 }
448
449 macro_rules! resume_reader {
450 () => {
451 if pause_control.reader_paused {
452 backfill_stream = select_with_strategy(
453 input.by_ref().map(Either::Left),
454 paused_reader
455 .take()
456 .expect("should have paused reader to resume"),
457 select_strategy,
458 );
459 pause_control.reader_paused = false;
460 }
461 };
462 }
463
464 if pause_control.is_paused() {
465 pause_reader!();
466 }
467
468 let state_store = self
469 .backfill_state_store
470 .state_store()
471 .state_store()
472 .clone();
473 let table_id = self.backfill_state_store.state_store().table_id().into();
474 let mut state_table_initialized = false;
475 {
476 let source_backfill_row_count = self
477 .metrics
478 .source_backfill_row_count
479 .with_guarded_label_values(&[
480 &self.source_id.to_string(),
481 &self.source_name,
482 &self.actor_ctx.id.to_string(),
483 &self.actor_ctx.fragment_id.to_string(),
484 ]);
485
486 // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
487 // milliseconds, considering some other latencies like network and cost in Meta.
488 let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
489 as u128
490 * WAIT_BARRIER_MULTIPLE_TIMES;
491 let mut last_barrier_time = Instant::now();
492
493 // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
494 'backfill_loop: while let Some(either) = backfill_stream.next().await {
495 match either {
496 // Upstream
497 Either::Left(msg) => {
498 let Ok(msg) = msg else {
499 let e = msg.unwrap_err();
500 tracing::warn!(
501 error = ?e.as_report(),
502 source_id = %self.source_id,
503 "stream source reader error",
504 );
505 GLOBAL_ERROR_METRICS.user_source_error.report([
506 "SourceReaderError".to_owned(),
507 self.source_id.to_string(),
508 self.source_name.to_owned(),
509 self.actor_ctx.fragment_id.to_string(),
510 ]);
511
512 let (reader, _backfill_info) = self
513 .build_stream_source_reader(
514 &source_desc,
515 backfill_stage.get_latest_unfinished_splits()?,
516 )
517 .await?;
518
519 backfill_stream = select_with_strategy(
520 input.by_ref().map(Either::Left),
521 reader.map(Either::Right),
522 select_strategy,
523 );
524 continue;
525 };
526 match msg {
527 Message::Barrier(barrier) => {
528 last_barrier_time = Instant::now();
529
530 if pause_control.self_resume() {
531 resume_reader!();
532 }
533
534 let mut maybe_muatation = None;
535 if let Some(ref mutation) = barrier.mutation.as_deref() {
536 match mutation {
537 Mutation::Pause => {
538 // pause_reader should not be invoked consecutively more than once.
539 pause_control.command_pause();
540 pause_reader!();
541 }
542 Mutation::Resume => {
543 // pause_reader.take should not be invoked consecutively more than once.
544 if pause_control.command_resume() {
545 resume_reader!();
546 }
547 }
548 Mutation::StartFragmentBackfill { fragment_ids } => {
549 if fragment_ids.contains(&self.actor_ctx.fragment_id)
550 && pause_control.backfill_resume()
551 {
552 resume_reader!();
553 }
554 }
555 Mutation::SourceChangeSplit(actor_splits) => {
556 tracing::info!(
557 actor_splits = ?actor_splits,
558 "source change split received"
559 );
560 maybe_muatation = actor_splits
561 .get(&self.actor_ctx.id)
562 .cloned()
563 .map(|target_splits| {
564 ApplyMutationAfterBarrier::SourceChangeSplit {
565 target_splits,
566 should_trim_state: true,
567 }
568 });
569 }
570 Mutation::Update(UpdateMutation {
571 actor_splits, ..
572 }) => {
573 maybe_muatation = actor_splits
574 .get(&self.actor_ctx.id)
575 .cloned()
576 .map(|target_splits| {
577 ApplyMutationAfterBarrier::SourceChangeSplit {
578 target_splits,
579 should_trim_state: false,
580 }
581 });
582 }
583 Mutation::ConnectorPropsChange(maybe_mutation) => {
584 if let Some(props_plaintext) =
585 maybe_mutation.get(&self.source_id.table_id())
586 {
587 source_desc
588 .update_reader(props_plaintext.clone())?;
589
590 maybe_muatation = Some(
591 ApplyMutationAfterBarrier::ConnectorPropsChange,
592 );
593 }
594 }
595 Mutation::Throttle(actor_to_apply) => {
596 if let Some(new_rate_limit) =
597 actor_to_apply.get(&self.actor_ctx.id)
598 && *new_rate_limit != self.rate_limit_rps
599 {
600 tracing::info!(
601 "updating rate limit from {:?} to {:?}",
602 self.rate_limit_rps,
603 *new_rate_limit
604 );
605 self.rate_limit_rps = *new_rate_limit;
606 // rebuild reader
607 let (reader, _backfill_info) = self
608 .build_stream_source_reader(
609 &source_desc,
610 backfill_stage
611 .get_latest_unfinished_splits()?,
612 )
613 .await?;
614
615 backfill_stream = select_with_strategy(
616 input.by_ref().map(Either::Left),
617 reader.map(Either::Right),
618 select_strategy,
619 );
620 }
621 }
622 _ => {}
623 }
624 }
625 async fn rebuild_reader_on_split_changed(
626 this: &SourceBackfillExecutorInner<impl StateStore>,
627 backfill_stage: &BackfillStage,
628 source_desc: &SourceDesc,
629 ) -> StreamExecutorResult<BoxSourceChunkStream>
630 {
631 // rebuild backfill_stream
632 // Note: we don't put this part in a method, due to some complex lifetime issues.
633
634 let latest_unfinished_splits =
635 backfill_stage.get_latest_unfinished_splits()?;
636 tracing::info!(
637 "actor {:?} apply source split change to {:?}",
638 this.actor_ctx.id,
639 latest_unfinished_splits
640 );
641
642 // Replace the source reader with a new one of the new state.
643 let (reader, _backfill_info) = this
644 .build_stream_source_reader(
645 source_desc,
646 latest_unfinished_splits,
647 )
648 .await?;
649
650 Ok(reader)
651 }
652
653 self.backfill_state_store
654 .set_states(backfill_stage.states.clone())
655 .await?;
656 self.backfill_state_store.commit(barrier.epoch).await?;
657
658 if self.should_report_finished(&backfill_stage.states) {
659 // drop the backfill kafka consumers
660 backfill_stream = select_with_strategy(
661 input.by_ref().map(Either::Left),
662 futures::stream::pending().boxed().map(Either::Right),
663 select_strategy,
664 );
665
666 self.progress.finish(
667 barrier.epoch,
668 backfill_stage.total_backfilled_rows(),
669 );
670
671 let barrier_epoch = barrier.epoch;
672 let is_checkpoint = barrier.is_checkpoint();
673 // yield barrier after reporting progress
674 yield Message::Barrier(barrier);
675
676 if let Some(to_apply_mutation) = maybe_muatation {
677 self.apply_split_change_after_yield_barrier(
678 barrier_epoch,
679 &mut backfill_stage,
680 to_apply_mutation,
681 )
682 .await?;
683 }
684
685 if !state_table_initialized {
686 if is_checkpoint {
687 // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
688 // We wait for 2nd epoch
689 let epoch = barrier_epoch.prev;
690 tracing::info!("waiting for epoch: {}", epoch);
691 state_store
692 .try_wait_epoch(
693 HummockReadEpoch::Committed(epoch),
694 TryWaitEpochOptions { table_id },
695 )
696 .await?;
697 tracing::info!("finished waiting for epoch: {}", epoch);
698 state_table_initialized = true;
699 }
700 } else {
701 // After we reported finished, we still don't exit the loop.
702 // Because we need to handle split migration.
703 assert!(
704 state_table_initialized,
705 "state table should be initialized before checking backfill finished"
706 );
707 if self.backfill_finished(&backfill_stage.states).await? {
708 tracing::info!("source backfill finished");
709 break 'backfill_loop;
710 }
711 }
712 } else {
713 self.progress.update_for_source_backfill(
714 barrier.epoch,
715 backfill_stage.total_backfilled_rows(),
716 );
717
718 let barrier_epoch = barrier.epoch;
719 // yield barrier after reporting progress
720 yield Message::Barrier(barrier);
721
722 if let Some(to_apply_mutation) = maybe_muatation {
723 if self
724 .apply_split_change_after_yield_barrier(
725 barrier_epoch,
726 &mut backfill_stage,
727 to_apply_mutation,
728 )
729 .await?
730 {
731 let reader = rebuild_reader_on_split_changed(
732 &self,
733 &backfill_stage,
734 &source_desc,
735 )
736 .await?;
737
738 backfill_stream = select_with_strategy(
739 input.by_ref().map(Either::Left),
740 reader.map(Either::Right),
741 select_strategy,
742 );
743 }
744 }
745 }
746 }
747 Message::Chunk(chunk) => {
748 // We need to iterate over all rows because there might be multiple splits in a chunk.
749 // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
750 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
751
752 for (i, (_, row)) in chunk.rows().enumerate() {
753 let split = row.datum_at(split_idx).unwrap().into_utf8();
754 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
755 let vis = backfill_stage.handle_upstream_row(split, offset);
756 new_vis.set(i, vis);
757 }
758 // emit chunk if vis is not empty. i.e., some splits finished backfilling.
759 let new_vis = new_vis.finish();
760 if new_vis.count_ones() != 0 {
761 let new_chunk = chunk.clone_with_vis(new_vis);
762 yield Message::Chunk(new_chunk);
763 }
764 }
765 Message::Watermark(_) => {
766 // Ignore watermark during backfill.
767 }
768 }
769 }
770 // backfill
771 Either::Right(msg) => {
772 let chunk = msg?;
773
774 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
775 // Pause to let barrier catch up via backpressure of snapshot stream.
776 pause_control.self_pause();
777 pause_reader!();
778
779 // Exceeds the max wait barrier time, the source will be paused.
780 // Currently we can guarantee the
781 // source is not paused since it received stream
782 // chunks.
783 tracing::warn!(
784 "source {} paused, wait barrier for {:?}",
785 self.info.identity,
786 last_barrier_time.elapsed()
787 );
788
789 // Only update `max_wait_barrier_time_ms` to capture
790 // `barrier_interval_ms`
791 // changes here to avoid frequently accessing the shared
792 // `system_params`.
793 max_wait_barrier_time_ms =
794 self.system_params.load().barrier_interval_ms() as u128
795 * WAIT_BARRIER_MULTIPLE_TIMES;
796 }
797 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
798
799 for (i, (_, row)) in chunk.rows().enumerate() {
800 let split_id = row.datum_at(split_idx).unwrap().into_utf8();
801 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
802 let vis = backfill_stage.handle_backfill_row(split_id, offset);
803 new_vis.set(i, vis);
804 }
805
806 let new_vis = new_vis.finish();
807 let card = new_vis.count_ones();
808 if card != 0 {
809 let new_chunk = chunk.clone_with_vis(new_vis);
810 yield Message::Chunk(new_chunk);
811 source_backfill_row_count.inc_by(card as u64);
812 }
813 }
814 }
815 }
816 }
817
818 std::mem::drop(backfill_stream);
819 let mut states = backfill_stage.states;
820 // Make sure `Finished` state is persisted.
821 self.backfill_state_store.set_states(states.clone()).await?;
822
823 // All splits finished backfilling. Now we only forward the source data.
824 #[for_await]
825 for msg in input {
826 let msg = msg?;
827 match msg {
828 Message::Barrier(barrier) => {
829 let mut split_changed = None;
830 if let Some(ref mutation) = barrier.mutation.as_deref() {
831 match mutation {
832 Mutation::Pause | Mutation::Resume => {
833 // We don't need to do anything. Handled by upstream.
834 }
835 Mutation::SourceChangeSplit(actor_splits) => {
836 tracing::info!(
837 actor_splits = ?actor_splits,
838 "source change split received"
839 );
840 split_changed = actor_splits
841 .get(&self.actor_ctx.id)
842 .cloned()
843 .map(|target_splits| (target_splits, &mut states, true));
844 }
845 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
846 split_changed = actor_splits
847 .get(&self.actor_ctx.id)
848 .cloned()
849 .map(|target_splits| (target_splits, &mut states, false));
850 }
851 _ => {}
852 }
853 }
854 self.backfill_state_store.commit(barrier.epoch).await?;
855 let barrier_epoch = barrier.epoch;
856 yield Message::Barrier(barrier);
857
858 if let Some((target_splits, state, should_trim_state)) = split_changed {
859 self.apply_split_change_forward_stage_after_yield_barrier(
860 barrier_epoch,
861 target_splits,
862 state,
863 should_trim_state,
864 )
865 .await?;
866 }
867 }
868 Message::Chunk(chunk) => {
869 yield Message::Chunk(chunk);
870 }
871 Message::Watermark(watermark) => {
872 yield Message::Watermark(watermark);
873 }
874 }
875 }
876 }
877
878 /// When we should call `progress.finish()` to let blocking DDL return.
879 /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
880 ///
881 /// Note: split migration (online scaling) is related with progress tracking.
882 /// - For foreground DDL, scaling is not allowed before progress is finished.
883 /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
884 ///
885 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
886 fn should_report_finished(&self, states: &BackfillStates) -> bool {
887 states.values().all(|state| {
888 matches!(
889 state.state,
890 BackfillState::Finished | BackfillState::SourceCachingUp(_)
891 )
892 })
893 }
894
895 /// All splits entered `Finished` state.
896 ///
897 /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
898 /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
899 /// this actor, we still need to backfill it.
900 ///
901 /// Note: at the beginning, the actor will only read the state written by itself.
902 /// It needs to _wait until it can read all actors' written data_.
903 /// i.e., wait for the second checkpoint has been available.
904 ///
905 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
906 async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
907 Ok(states
908 .values()
909 .all(|state| matches!(state.state, BackfillState::Finished))
910 && self
911 .backfill_state_store
912 .scan_may_stale()
913 .await?
914 .into_iter()
915 .all(|state| matches!(state.state, BackfillState::Finished)))
916 }
917
918 /// For newly added splits, we do not need to backfill and can directly forward from upstream.
919 async fn apply_split_change_after_yield_barrier(
920 &mut self,
921 barrier_epoch: EpochPair,
922 stage: &mut BackfillStage,
923 to_apply_mutation: ApplyMutationAfterBarrier,
924 ) -> StreamExecutorResult<bool> {
925 match to_apply_mutation {
926 ApplyMutationAfterBarrier::SourceChangeSplit {
927 target_splits,
928 should_trim_state,
929 } => {
930 self.source_split_change_count.inc();
931 {
932 if self
933 .update_state_if_changed(
934 barrier_epoch,
935 target_splits,
936 stage,
937 should_trim_state,
938 )
939 .await?
940 {
941 // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
942 Ok(true)
943 } else {
944 Ok(false)
945 }
946 }
947 }
948 ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
949 }
950 }
951
952 /// Returns `true` if split changed. Otherwise `false`.
953 async fn update_state_if_changed(
954 &mut self,
955 barrier_epoch: EpochPair,
956 target_splits: Vec<SplitImpl>,
957 stage: &mut BackfillStage,
958 should_trim_state: bool,
959 ) -> StreamExecutorResult<bool> {
960 let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
961
962 let mut split_changed = false;
963 // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
964 // Will be set to target_state in the end.
965 let old_states = std::mem::take(&mut stage.states);
966 let committed_reader = self
967 .backfill_state_store
968 .new_committed_reader(barrier_epoch)
969 .await?;
970 // Iterate over the target (assigned) splits
971 // - check if any new splits are added
972 // - build target_state
973 for split in &target_splits {
974 let split_id = split.id();
975 if let Some(s) = old_states.get(&split_id) {
976 target_state.insert(split_id, s.clone());
977 } else {
978 split_changed = true;
979
980 let backfill_state = committed_reader
981 .try_recover_from_state_store(&split_id)
982 .await?;
983 match backfill_state {
984 None => {
985 // Newly added split. We don't need to backfill.
986 // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
987 target_state.insert(
988 split_id,
989 BackfillStateWithProgress {
990 state: BackfillState::Finished,
991 num_consumed_rows: 0,
992 target_offset: None,
993 },
994 );
995 }
996 Some(backfill_state) => {
997 // Migrated split. Backfill if unfinished.
998 target_state.insert(split_id, backfill_state);
999 }
1000 }
1001 }
1002 }
1003
1004 // Checks dropped splits
1005 for existing_split_id in old_states.keys() {
1006 if !target_state.contains_key(existing_split_id) {
1007 tracing::info!("split dropping detected: {}", existing_split_id);
1008 split_changed = true;
1009 }
1010 }
1011
1012 if split_changed {
1013 let dropped_splits = stage
1014 .states
1015 .extract_if(|split_id, _| !target_state.contains_key(split_id))
1016 .map(|(split_id, _)| split_id);
1017
1018 if should_trim_state {
1019 // trim dropped splits' state
1020 self.backfill_state_store.trim_state(dropped_splits).await?;
1021 }
1022 tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1023 } else {
1024 debug_assert_eq!(old_states, target_state);
1025 }
1026 stage.states = target_state;
1027 stage.splits = target_splits;
1028 stage.debug_assert_consistent();
1029 Ok(split_changed)
1030 }
1031
1032 /// For split change during forward stage, all newly added splits should be already finished.
1033 // We just need to update the state store if necessary.
1034 async fn apply_split_change_forward_stage_after_yield_barrier(
1035 &mut self,
1036 barrier_epoch: EpochPair,
1037 target_splits: Vec<SplitImpl>,
1038 states: &mut BackfillStates,
1039 should_trim_state: bool,
1040 ) -> StreamExecutorResult<()> {
1041 self.source_split_change_count.inc();
1042 {
1043 self.update_state_if_changed_forward_stage(
1044 barrier_epoch,
1045 target_splits,
1046 states,
1047 should_trim_state,
1048 )
1049 .await?;
1050 }
1051
1052 Ok(())
1053 }
1054
1055 async fn update_state_if_changed_forward_stage(
1056 &mut self,
1057 barrier_epoch: EpochPair,
1058 target_splits: Vec<SplitImpl>,
1059 states: &mut BackfillStates,
1060 should_trim_state: bool,
1061 ) -> StreamExecutorResult<()> {
1062 let target_splits: HashSet<SplitId> = target_splits
1063 .into_iter()
1064 .map(|split| (split.id()))
1065 .collect();
1066
1067 let mut split_changed = false;
1068 let mut newly_added_splits = vec![];
1069
1070 let committed_reader = self
1071 .backfill_state_store
1072 .new_committed_reader(barrier_epoch)
1073 .await?;
1074
1075 // Checks added splits
1076 for split_id in &target_splits {
1077 if !states.contains_key(split_id) {
1078 split_changed = true;
1079
1080 let backfill_state = committed_reader
1081 .try_recover_from_state_store(split_id)
1082 .await?;
1083 match &backfill_state {
1084 None => {
1085 // Newly added split. We don't need to backfill!
1086 newly_added_splits.push(split_id.clone());
1087 }
1088 Some(backfill_state) => {
1089 // Migrated split. It should also be finished since we are in forwarding stage.
1090 match backfill_state.state {
1091 BackfillState::Finished => {}
1092 _ => {
1093 return Err(anyhow::anyhow!(
1094 "Unexpected backfill state: {:?}",
1095 backfill_state
1096 )
1097 .into());
1098 }
1099 }
1100 }
1101 }
1102 states.insert(
1103 split_id.clone(),
1104 backfill_state.unwrap_or(BackfillStateWithProgress {
1105 state: BackfillState::Finished,
1106 num_consumed_rows: 0,
1107 target_offset: None,
1108 }),
1109 );
1110 }
1111 }
1112
1113 // Checks dropped splits
1114 for existing_split_id in states.keys() {
1115 if !target_splits.contains(existing_split_id) {
1116 tracing::info!("split dropping detected: {}", existing_split_id);
1117 split_changed = true;
1118 }
1119 }
1120
1121 if split_changed {
1122 tracing::info!(
1123 target_splits = ?target_splits,
1124 "apply split change"
1125 );
1126
1127 let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1128
1129 if should_trim_state {
1130 // trim dropped splits' state
1131 self.backfill_state_store
1132 .trim_state(dropped_splits.map(|(k, _v)| k))
1133 .await?;
1134 }
1135
1136 // For migrated splits, and existing splits, we do not need to update
1137 // state store, but only for newly added splits.
1138 self.backfill_state_store
1139 .set_states(
1140 newly_added_splits
1141 .into_iter()
1142 .map(|split_id| {
1143 (
1144 split_id,
1145 BackfillStateWithProgress {
1146 state: BackfillState::Finished,
1147 num_consumed_rows: 0,
1148 target_offset: None,
1149 },
1150 )
1151 })
1152 .collect(),
1153 )
1154 .await?;
1155 }
1156
1157 Ok(())
1158 }
1159}
1160
1161fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1162 let a = a.parse::<i64>().unwrap();
1163 let b = b.parse::<i64>().unwrap();
1164 a.cmp(&b)
1165}
1166
1167impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1168 fn execute(self: Box<Self>) -> BoxedMessageStream {
1169 self.inner.execute(self.input).boxed()
1170 }
1171}
1172
1173impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1174 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1175 f.debug_struct("SourceBackfillExecutor")
1176 .field("source_id", &self.source_id)
1177 .field("column_ids", &self.column_ids)
1178 .field("pk_indices", &self.info.pk_indices)
1179 .finish()
1180 }
1181}
1182
1183struct PauseControl {
1184 // Paused due to backfill order control
1185 backfill_paused: bool,
1186 // Paused due to self-pause, e.g. let barrier catch up
1187 self_paused: bool,
1188 // Paused due to Pause command from meta, pause_on_next_bootstrap
1189 command_paused: bool,
1190 // reader paused
1191 reader_paused: bool,
1192}
1193
1194impl PauseControl {
1195 fn new() -> Self {
1196 Self {
1197 backfill_paused: false,
1198 self_paused: false,
1199 command_paused: false,
1200 reader_paused: false,
1201 }
1202 }
1203
1204 fn is_paused(&self) -> bool {
1205 self.backfill_paused || self.command_paused || self.self_paused
1206 }
1207
1208 /// returns whether we need to pause the reader.
1209 fn backfill_pause(&mut self) {
1210 if self.backfill_paused {
1211 tracing::warn!("backfill_pause invoked twice");
1212 }
1213 self.backfill_paused = true;
1214 }
1215
1216 /// returns whether we need to resume the reader.
1217 /// same precedence as command.
1218 fn backfill_resume(&mut self) -> bool {
1219 if !self.backfill_paused {
1220 tracing::warn!("backfill_resume invoked twice");
1221 }
1222 !self.command_paused
1223 }
1224
1225 /// returns whether we need to pause the reader.
1226 fn self_pause(&mut self) {
1227 assert!(
1228 !self.backfill_paused,
1229 "backfill stream should not be read when backfill_pause is set"
1230 );
1231 assert!(
1232 !self.command_paused,
1233 "backfill stream should not be read when command_pause is set"
1234 );
1235 if self.self_paused {
1236 tracing::warn!("self_pause invoked twice");
1237 }
1238 self.self_paused = true;
1239 }
1240
1241 /// returns whether we need to resume the reader.
1242 /// `self_resume` has the lowest precedence,
1243 /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1244 fn self_resume(&mut self) -> bool {
1245 self.self_paused = false;
1246 !(self.backfill_paused || self.command_paused)
1247 }
1248
1249 /// returns whether we need to pause the reader.
1250 fn command_pause(&mut self) {
1251 if self.command_paused {
1252 tracing::warn!("command_pause invoked twice");
1253 }
1254 self.command_paused = true;
1255 }
1256
1257 /// returns whether we need to resume the reader.
1258 /// same precedence as backfill.
1259 fn command_resume(&mut self) -> bool {
1260 if !self.command_paused {
1261 tracing::warn!("command_resume invoked twice");
1262 }
1263 self.command_paused = false;
1264 !self.backfill_paused
1265 }
1266}