risingwave_stream/executor/source/source_backfill_executor.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32 BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33 SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51 /// `None` means not started yet. It's the initial state.
52 /// XXX: perhaps we can also set to low-watermark instead of `None`
53 Backfilling(Option<String>),
54 /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55 SourceCachingUp(String),
56 Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64 pub state: BackfillState,
65 pub num_consumed_rows: u64,
66 /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67 /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68 /// so that we can finish backfilling even when upstream doesn't emit any data.
69 pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73 pub fn encode_to_json(self) -> JsonbVal {
74 serde_json::to_value(self).unwrap().into()
75 }
76
77 pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78 serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79 }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83 pub inner: SourceBackfillExecutorInner<S>,
84 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85 pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89 actor_ctx: ActorContextRef,
90 info: ExecutorInfo,
91
92 /// Streaming source for external
93 source_id: TableId,
94 source_name: String,
95 column_ids: Vec<ColumnId>,
96 source_desc_builder: Option<SourceDescBuilder>,
97 backfill_state_store: BackfillStateTableHandler<S>,
98
99 /// Metrics for monitor.
100 metrics: Arc<StreamingMetrics>,
101 source_split_change_count: LabelGuardedIntCounter<4>,
102
103 // /// Receiver of barrier channel.
104 // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105 /// System parameter reader to read barrier interval
106 system_params: SystemParamsReaderRef,
107
108 /// Rate limit in rows/s.
109 rate_limit_rps: Option<u32>,
110
111 progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121 states: BackfillStates,
122 /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123 ///
124 /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125 splits: Vec<SplitImpl>,
126}
127
128impl BackfillStage {
129 fn total_backfilled_rows(&self) -> u64 {
130 self.states.values().map(|s| s.num_consumed_rows).sum()
131 }
132
133 fn debug_assert_consistent(&self) {
134 if cfg!(debug_assertions) {
135 let all_splits: HashSet<_> =
136 self.splits.iter().map(|split| split.id().clone()).collect();
137 assert_eq!(
138 self.states.keys().cloned().collect::<HashSet<_>>(),
139 all_splits
140 );
141 }
142 }
143
144 /// Get unfinished splits with latest offsets according to the backfill states.
145 fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
146 let mut unfinished_splits = Vec::new();
147 for split in &self.splits {
148 let state = &self.states.get(split.id().as_ref()).unwrap().state;
149 match state {
150 BackfillState::Backfilling(Some(offset)) => {
151 let mut updated_split = split.clone();
152 updated_split.update_in_place(offset.clone())?;
153 unfinished_splits.push(updated_split);
154 }
155 BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
156 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
157 }
158 }
159 Ok(unfinished_splits)
160 }
161
162 /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
163 fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
164 let mut vis = false;
165 let state = self.states.get_mut(split_id).unwrap();
166 let state_inner = &mut state.state;
167 match state_inner {
168 BackfillState::Backfilling(None) => {
169 // backfilling for this split is not started yet. Ignore this row
170 }
171 BackfillState::Backfilling(Some(backfill_offset)) => {
172 match compare_kafka_offset(backfill_offset, offset) {
173 Ordering::Less => {
174 // continue backfilling. Ignore this row
175 }
176 Ordering::Equal => {
177 // backfilling for this split is finished just right.
178 *state_inner = BackfillState::Finished;
179 }
180 Ordering::Greater => {
181 // backfilling for this split produced more data than current source's progress.
182 // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
183 *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
184 }
185 }
186 }
187 BackfillState::SourceCachingUp(backfill_offset) => {
188 match compare_kafka_offset(backfill_offset, offset) {
189 Ordering::Less => {
190 // Source caught up, but doesn't contain the last backfilled row.
191 // This may happen e.g., if Kafka performed compaction.
192 vis = true;
193 *state_inner = BackfillState::Finished;
194 }
195 Ordering::Equal => {
196 // Source just caught up with backfilling.
197 *state_inner = BackfillState::Finished;
198 }
199 Ordering::Greater => {
200 // Source is still behind backfilling.
201 }
202 }
203 }
204 BackfillState::Finished => {
205 vis = true;
206 // This split's backfilling is finished, we are waiting for other splits
207 }
208 }
209 if matches!(state_inner, BackfillState::Backfilling(_)) {
210 state.target_offset = Some(offset.to_owned());
211 }
212 if vis {
213 debug_assert_eq!(*state_inner, BackfillState::Finished);
214 }
215 vis
216 }
217
218 /// Updates backfill states and returns whether the row backfilled from external system is visible.
219 fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
220 let state = self.states.get_mut(split_id).unwrap();
221 state.num_consumed_rows += 1;
222 let state_inner = &mut state.state;
223 match state_inner {
224 BackfillState::Backfilling(_old_offset) => {
225 if let Some(target_offset) = &state.target_offset
226 && compare_kafka_offset(offset, target_offset).is_ge()
227 {
228 // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
229 // and dropping duplicated messages.
230 // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
231 // In this case, upstream hasn't reached the target_offset yet.
232 //
233 // Note2: after this, all following rows in the current chunk will be invisible.
234 //
235 // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
236 // keep backfilling.
237 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
238 } else {
239 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
240 }
241 true
242 }
243 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
244 // backfilling stopped. ignore
245 false
246 }
247 }
248 }
249}
250
251impl<S: StateStore> SourceBackfillExecutorInner<S> {
252 #[expect(clippy::too_many_arguments)]
253 pub fn new(
254 actor_ctx: ActorContextRef,
255 info: ExecutorInfo,
256 stream_source_core: StreamSourceCore<S>,
257 metrics: Arc<StreamingMetrics>,
258 system_params: SystemParamsReaderRef,
259 backfill_state_store: BackfillStateTableHandler<S>,
260 rate_limit_rps: Option<u32>,
261 progress: CreateMviewProgressReporter,
262 ) -> Self {
263 let source_split_change_count = metrics
264 .source_split_change_count
265 .with_guarded_label_values(&[
266 &stream_source_core.source_id.to_string(),
267 &stream_source_core.source_name,
268 &actor_ctx.id.to_string(),
269 &actor_ctx.fragment_id.to_string(),
270 ]);
271
272 Self {
273 actor_ctx,
274 info,
275 source_id: stream_source_core.source_id,
276 source_name: stream_source_core.source_name,
277 column_ids: stream_source_core.column_ids,
278 source_desc_builder: stream_source_core.source_desc_builder,
279 backfill_state_store,
280 metrics,
281 source_split_change_count,
282 system_params,
283 rate_limit_rps,
284 progress,
285 }
286 }
287
288 async fn build_stream_source_reader(
289 &self,
290 source_desc: &SourceDesc,
291 splits: Vec<SplitImpl>,
292 ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
293 let column_ids = source_desc
294 .columns
295 .iter()
296 .map(|column_desc| column_desc.column_id)
297 .collect_vec();
298 let source_ctx = SourceContext::new(
299 self.actor_ctx.id,
300 self.source_id,
301 self.actor_ctx.fragment_id,
302 self.source_name.clone(),
303 source_desc.metrics.clone(),
304 SourceCtrlOpts {
305 chunk_size: limited_chunk_size(self.rate_limit_rps),
306 split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
307 },
308 source_desc.source.config.clone(),
309 None,
310 );
311
312 // We will check watermark to decide whether we need to backfill.
313 // e.g., when there's a Kafka topic-partition without any data,
314 // we don't need to backfill at all. But if we do not check here,
315 // the executor can only know it's finished when data coming in.
316 // For blocking DDL, this would be annoying.
317
318 let (stream, res) = source_desc
319 .source
320 .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
321 .await
322 .map_err(StreamExecutorError::connector_error)?;
323 Ok((
324 apply_rate_limit(stream, self.rate_limit_rps).boxed(),
325 res.backfill_info,
326 ))
327 }
328
329 #[try_stream(ok = Message, error = StreamExecutorError)]
330 async fn execute(mut self, input: Executor) {
331 let mut input = input.execute();
332
333 // Poll the upstream to get the first barrier.
334 let barrier = expect_first_barrier(&mut input).await?;
335 let first_epoch = barrier.epoch;
336 let owned_splits = barrier
337 .initial_split_assignment(self.actor_ctx.id)
338 .unwrap_or(&[])
339 .to_vec();
340 let is_pause_on_startup = barrier.is_pause_on_startup();
341 yield Message::Barrier(barrier);
342
343 let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
344 let source_desc = source_desc_builder
345 .build()
346 .map_err(StreamExecutorError::connector_error)?;
347 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
348 else {
349 unreachable!("Partition and offset columns must be set.");
350 };
351
352 self.backfill_state_store.init_epoch(first_epoch).await?;
353
354 let mut backfill_states: BackfillStates = HashMap::new();
355 {
356 let committed_reader = self
357 .backfill_state_store
358 .new_committed_reader(first_epoch)
359 .await?;
360 for split in &owned_splits {
361 let split_id = split.id();
362 let backfill_state = committed_reader
363 .try_recover_from_state_store(&split_id)
364 .await?
365 .unwrap_or(BackfillStateWithProgress {
366 state: BackfillState::Backfilling(None),
367 num_consumed_rows: 0,
368 target_offset: None, // init with None
369 });
370 backfill_states.insert(split_id, backfill_state);
371 }
372 }
373 let mut backfill_stage = BackfillStage {
374 states: backfill_states,
375 splits: owned_splits,
376 };
377 backfill_stage.debug_assert_consistent();
378
379 let (source_chunk_reader, backfill_info) = self
380 .build_stream_source_reader(
381 &source_desc,
382 backfill_stage.get_latest_unfinished_splits()?,
383 )
384 .instrument_await("source_build_reader")
385 .await?;
386 for (split_id, info) in &backfill_info {
387 let state = backfill_stage.states.get_mut(split_id).unwrap();
388 match info {
389 BackfillInfo::NoDataToBackfill => {
390 state.state = BackfillState::Finished;
391 }
392 BackfillInfo::HasDataToBackfill { latest_offset } => {
393 // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
394 state.target_offset = Some(latest_offset.clone());
395 }
396 }
397 }
398 tracing::debug!(?backfill_stage, "source backfill started");
399
400 fn select_strategy(_: &mut ()) -> PollNext {
401 futures::stream::PollNext::Left
402 }
403
404 // We choose "preferring upstream" strategy here, because:
405 // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
406 // For chunks from upstream, they are simply dropped, so there's no much overhead.
407 // So possibly this can also affect other running jobs less.
408 // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
409 let mut backfill_stream = select_with_strategy(
410 input.by_ref().map(Either::Left),
411 source_chunk_reader.map(Either::Right),
412 select_strategy,
413 );
414
415 type PausedReader = Option<impl Stream>;
416 let mut paused_reader: PausedReader = None;
417 let mut command_paused = false;
418
419 macro_rules! pause_reader {
420 () => {
421 let (left, right) = backfill_stream.into_inner();
422 backfill_stream = select_with_strategy(
423 left,
424 futures::stream::pending().boxed().map(Either::Right),
425 select_strategy,
426 );
427 // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
428 paused_reader = Some(right);
429 };
430 }
431
432 // If the first barrier requires us to pause on startup, pause the stream.
433 if is_pause_on_startup {
434 command_paused = true;
435 pause_reader!();
436 }
437
438 let state_store = self
439 .backfill_state_store
440 .state_store()
441 .state_store()
442 .clone();
443 let table_id = self.backfill_state_store.state_store().table_id().into();
444 let mut state_table_initialized = false;
445 {
446 let source_backfill_row_count = self
447 .metrics
448 .source_backfill_row_count
449 .with_guarded_label_values(&[
450 &self.source_id.to_string(),
451 &self.source_name,
452 &self.actor_ctx.id.to_string(),
453 &self.actor_ctx.fragment_id.to_string(),
454 ]);
455
456 // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
457 // milliseconds, considering some other latencies like network and cost in Meta.
458 let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
459 as u128
460 * WAIT_BARRIER_MULTIPLE_TIMES;
461 let mut last_barrier_time = Instant::now();
462 let mut self_paused = false;
463
464 // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
465 'backfill_loop: while let Some(either) = backfill_stream.next().await {
466 match either {
467 // Upstream
468 Either::Left(msg) => {
469 let Ok(msg) = msg else {
470 let e = msg.unwrap_err();
471 tracing::warn!(
472 error = ?e.as_report(),
473 source_id = %self.source_id,
474 "stream source reader error",
475 );
476 GLOBAL_ERROR_METRICS.user_source_error.report([
477 "SourceReaderError".to_owned(),
478 self.source_id.to_string(),
479 self.source_name.to_owned(),
480 self.actor_ctx.fragment_id.to_string(),
481 ]);
482
483 let (reader, _backfill_info) = self
484 .build_stream_source_reader(
485 &source_desc,
486 backfill_stage.get_latest_unfinished_splits()?,
487 )
488 .await?;
489
490 backfill_stream = select_with_strategy(
491 input.by_ref().map(Either::Left),
492 reader.map(Either::Right),
493 select_strategy,
494 );
495 continue;
496 };
497 match msg {
498 Message::Barrier(barrier) => {
499 last_barrier_time = Instant::now();
500
501 if self_paused {
502 // command_paused has a higher priority.
503 if !command_paused {
504 backfill_stream = select_with_strategy(
505 input.by_ref().map(Either::Left),
506 paused_reader
507 .take()
508 .expect("no paused reader to resume"),
509 select_strategy,
510 );
511 }
512 self_paused = false;
513 }
514
515 let mut split_changed = None;
516 if let Some(ref mutation) = barrier.mutation.as_deref() {
517 match mutation {
518 Mutation::Pause => {
519 // pause_reader should not be invoked consecutively more than once.
520 if !command_paused {
521 pause_reader!();
522 command_paused = true;
523 } else {
524 tracing::warn!(command_paused, "unexpected pause");
525 }
526 }
527 Mutation::Resume => {
528 // pause_reader.take should not be invoked consecutively more than once.
529 if command_paused {
530 backfill_stream = select_with_strategy(
531 input.by_ref().map(Either::Left),
532 paused_reader
533 .take()
534 .expect("no paused reader to resume"),
535 select_strategy,
536 );
537 command_paused = false;
538 } else {
539 tracing::warn!(command_paused, "unexpected resume");
540 }
541 }
542 Mutation::SourceChangeSplit(actor_splits) => {
543 tracing::info!(
544 actor_splits = ?actor_splits,
545 "source change split received"
546 );
547 split_changed = actor_splits
548 .get(&self.actor_ctx.id)
549 .cloned()
550 .map(|target_splits| (target_splits, true));
551 }
552 Mutation::Update(UpdateMutation {
553 actor_splits, ..
554 }) => {
555 split_changed = actor_splits
556 .get(&self.actor_ctx.id)
557 .cloned()
558 .map(|target_splits| (target_splits, false));
559 }
560 Mutation::Throttle(actor_to_apply) => {
561 if let Some(new_rate_limit) =
562 actor_to_apply.get(&self.actor_ctx.id)
563 && *new_rate_limit != self.rate_limit_rps
564 {
565 tracing::info!(
566 "updating rate limit from {:?} to {:?}",
567 self.rate_limit_rps,
568 *new_rate_limit
569 );
570 self.rate_limit_rps = *new_rate_limit;
571 // rebuild reader
572 let (reader, _backfill_info) = self
573 .build_stream_source_reader(
574 &source_desc,
575 backfill_stage
576 .get_latest_unfinished_splits()?,
577 )
578 .await?;
579
580 backfill_stream = select_with_strategy(
581 input.by_ref().map(Either::Left),
582 reader.map(Either::Right),
583 select_strategy,
584 );
585 }
586 }
587 _ => {}
588 }
589 }
590 async fn rebuild_reader_on_split_changed(
591 this: &SourceBackfillExecutorInner<impl StateStore>,
592 backfill_stage: &BackfillStage,
593 source_desc: &SourceDesc,
594 ) -> StreamExecutorResult<BoxSourceChunkStream>
595 {
596 // rebuild backfill_stream
597 // Note: we don't put this part in a method, due to some complex lifetime issues.
598
599 let latest_unfinished_splits =
600 backfill_stage.get_latest_unfinished_splits()?;
601 tracing::info!(
602 "actor {:?} apply source split change to {:?}",
603 this.actor_ctx.id,
604 latest_unfinished_splits
605 );
606
607 // Replace the source reader with a new one of the new state.
608 let (reader, _backfill_info) = this
609 .build_stream_source_reader(
610 source_desc,
611 latest_unfinished_splits,
612 )
613 .await?;
614
615 Ok(reader)
616 }
617
618 self.backfill_state_store
619 .set_states(backfill_stage.states.clone())
620 .await?;
621 self.backfill_state_store.commit(barrier.epoch).await?;
622
623 if self.should_report_finished(&backfill_stage.states) {
624 // drop the backfill kafka consumers
625 backfill_stream = select_with_strategy(
626 input.by_ref().map(Either::Left),
627 futures::stream::pending().boxed().map(Either::Right),
628 select_strategy,
629 );
630
631 self.progress.finish(
632 barrier.epoch,
633 backfill_stage.total_backfilled_rows(),
634 );
635
636 let barrier_epoch = barrier.epoch;
637 let is_checkpoint = barrier.is_checkpoint();
638 // yield barrier after reporting progress
639 yield Message::Barrier(barrier);
640
641 if let Some((target_splits, should_trim_state)) = split_changed
642 {
643 self.apply_split_change_after_yield_barrier(
644 barrier_epoch,
645 target_splits,
646 &mut backfill_stage,
647 should_trim_state,
648 )
649 .await?;
650 }
651
652 if !state_table_initialized {
653 if is_checkpoint {
654 // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
655 // We wait for 2nd epoch
656 let epoch = barrier_epoch.prev;
657 tracing::info!("waiting for epoch: {}", epoch);
658 state_store
659 .try_wait_epoch(
660 HummockReadEpoch::Committed(epoch),
661 TryWaitEpochOptions { table_id },
662 )
663 .await?;
664 tracing::info!("finished waiting for epoch: {}", epoch);
665 state_table_initialized = true;
666 }
667 } else {
668 // After we reported finished, we still don't exit the loop.
669 // Because we need to handle split migration.
670 assert!(
671 state_table_initialized,
672 "state table should be initialized before checking backfill finished"
673 );
674 if self.backfill_finished(&backfill_stage.states).await? {
675 tracing::info!("source backfill finished");
676 break 'backfill_loop;
677 }
678 }
679 } else {
680 self.progress.update_for_source_backfill(
681 barrier.epoch,
682 backfill_stage.total_backfilled_rows(),
683 );
684
685 let barrier_epoch = barrier.epoch;
686 // yield barrier after reporting progress
687 yield Message::Barrier(barrier);
688
689 if let Some((target_splits, should_trim_state)) = split_changed
690 {
691 if self
692 .apply_split_change_after_yield_barrier(
693 barrier_epoch,
694 target_splits,
695 &mut backfill_stage,
696 should_trim_state,
697 )
698 .await?
699 {
700 let reader = rebuild_reader_on_split_changed(
701 &self,
702 &backfill_stage,
703 &source_desc,
704 )
705 .await?;
706
707 backfill_stream = select_with_strategy(
708 input.by_ref().map(Either::Left),
709 reader.map(Either::Right),
710 select_strategy,
711 );
712 }
713 }
714 }
715 }
716 Message::Chunk(chunk) => {
717 // We need to iterate over all rows because there might be multiple splits in a chunk.
718 // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
719 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
720
721 for (i, (_, row)) in chunk.rows().enumerate() {
722 let split = row.datum_at(split_idx).unwrap().into_utf8();
723 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
724 let vis = backfill_stage.handle_upstream_row(split, offset);
725 new_vis.set(i, vis);
726 }
727 // emit chunk if vis is not empty. i.e., some splits finished backfilling.
728 let new_vis = new_vis.finish();
729 if new_vis.count_ones() != 0 {
730 let new_chunk = chunk.clone_with_vis(new_vis);
731 yield Message::Chunk(new_chunk);
732 }
733 }
734 Message::Watermark(_) => {
735 // Ignore watermark during backfill.
736 }
737 }
738 }
739 // backfill
740 Either::Right(msg) => {
741 let chunk = msg?;
742
743 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
744 assert!(!command_paused, "command_paused should be false");
745 // pause_reader should not be invoked consecutively more than once.
746 if !self_paused {
747 pause_reader!();
748 } else {
749 tracing::warn!(self_paused, "unexpected self pause");
750 }
751 // Exceeds the max wait barrier time, the source will be paused.
752 // Currently we can guarantee the
753 // source is not paused since it received stream
754 // chunks.
755 self_paused = true;
756 tracing::warn!(
757 "source {} paused, wait barrier for {:?}",
758 self.info.identity,
759 last_barrier_time.elapsed()
760 );
761
762 // Only update `max_wait_barrier_time_ms` to capture
763 // `barrier_interval_ms`
764 // changes here to avoid frequently accessing the shared
765 // `system_params`.
766 max_wait_barrier_time_ms =
767 self.system_params.load().barrier_interval_ms() as u128
768 * WAIT_BARRIER_MULTIPLE_TIMES;
769 }
770 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
771
772 for (i, (_, row)) in chunk.rows().enumerate() {
773 let split_id = row.datum_at(split_idx).unwrap().into_utf8();
774 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
775 let vis = backfill_stage.handle_backfill_row(split_id, offset);
776 new_vis.set(i, vis);
777 }
778
779 let new_vis = new_vis.finish();
780 let card = new_vis.count_ones();
781 if card != 0 {
782 let new_chunk = chunk.clone_with_vis(new_vis);
783 yield Message::Chunk(new_chunk);
784 source_backfill_row_count.inc_by(card as u64);
785 }
786 }
787 }
788 }
789 }
790
791 std::mem::drop(backfill_stream);
792 let mut states = backfill_stage.states;
793 // Make sure `Finished` state is persisted.
794 self.backfill_state_store.set_states(states.clone()).await?;
795
796 // All splits finished backfilling. Now we only forward the source data.
797 #[for_await]
798 for msg in input {
799 let msg = msg?;
800 match msg {
801 Message::Barrier(barrier) => {
802 let mut split_changed = None;
803 if let Some(ref mutation) = barrier.mutation.as_deref() {
804 match mutation {
805 Mutation::Pause | Mutation::Resume => {
806 // We don't need to do anything. Handled by upstream.
807 }
808 Mutation::SourceChangeSplit(actor_splits) => {
809 tracing::info!(
810 actor_splits = ?actor_splits,
811 "source change split received"
812 );
813 split_changed = actor_splits
814 .get(&self.actor_ctx.id)
815 .cloned()
816 .map(|target_splits| (target_splits, &mut states, true));
817 }
818 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
819 split_changed = actor_splits
820 .get(&self.actor_ctx.id)
821 .cloned()
822 .map(|target_splits| (target_splits, &mut states, false));
823 }
824 _ => {}
825 }
826 }
827 self.backfill_state_store.commit(barrier.epoch).await?;
828 let barrier_epoch = barrier.epoch;
829 yield Message::Barrier(barrier);
830
831 if let Some((target_splits, state, should_trim_state)) = split_changed {
832 self.apply_split_change_forward_stage_after_yield_barrier(
833 barrier_epoch,
834 target_splits,
835 state,
836 should_trim_state,
837 )
838 .await?;
839 }
840 }
841 Message::Chunk(chunk) => {
842 yield Message::Chunk(chunk);
843 }
844 Message::Watermark(watermark) => {
845 yield Message::Watermark(watermark);
846 }
847 }
848 }
849 }
850
851 /// When we should call `progress.finish()` to let blocking DDL return.
852 /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
853 ///
854 /// Note: split migration (online scaling) is related with progress tracking.
855 /// - For foreground DDL, scaling is not allowed before progress is finished.
856 /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
857 ///
858 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
859 fn should_report_finished(&self, states: &BackfillStates) -> bool {
860 states.values().all(|state| {
861 matches!(
862 state.state,
863 BackfillState::Finished | BackfillState::SourceCachingUp(_)
864 )
865 })
866 }
867
868 /// All splits entered `Finished` state.
869 ///
870 /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
871 /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
872 /// this actor, we still need to backfill it.
873 ///
874 /// Note: at the beginning, the actor will only read the state written by itself.
875 /// It needs to _wait until it can read all actors' written data_.
876 /// i.e., wait for the second checkpoint has been available.
877 ///
878 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
879 async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
880 Ok(states
881 .values()
882 .all(|state| matches!(state.state, BackfillState::Finished))
883 && self
884 .backfill_state_store
885 .scan_may_stale()
886 .await?
887 .into_iter()
888 .all(|state| matches!(state.state, BackfillState::Finished)))
889 }
890
891 /// For newly added splits, we do not need to backfill and can directly forward from upstream.
892 async fn apply_split_change_after_yield_barrier(
893 &mut self,
894 barrier_epoch: EpochPair,
895 target_splits: Vec<SplitImpl>,
896 stage: &mut BackfillStage,
897 should_trim_state: bool,
898 ) -> StreamExecutorResult<bool> {
899 self.source_split_change_count.inc();
900 {
901 if self
902 .update_state_if_changed(barrier_epoch, target_splits, stage, should_trim_state)
903 .await?
904 {
905 // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
906 return Ok(true);
907 }
908 }
909
910 Ok(false)
911 }
912
913 /// Returns `true` if split changed. Otherwise `false`.
914 async fn update_state_if_changed(
915 &mut self,
916 barrier_epoch: EpochPair,
917 target_splits: Vec<SplitImpl>,
918 stage: &mut BackfillStage,
919 should_trim_state: bool,
920 ) -> StreamExecutorResult<bool> {
921 let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
922
923 let mut split_changed = false;
924 // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
925 // Will be set to target_state in the end.
926 let old_states = std::mem::take(&mut stage.states);
927 let committed_reader = self
928 .backfill_state_store
929 .new_committed_reader(barrier_epoch)
930 .await?;
931 // Iterate over the target (assigned) splits
932 // - check if any new splits are added
933 // - build target_state
934 for split in &target_splits {
935 let split_id = split.id();
936 if let Some(s) = old_states.get(&split_id) {
937 target_state.insert(split_id, s.clone());
938 } else {
939 split_changed = true;
940
941 let backfill_state = committed_reader
942 .try_recover_from_state_store(&split_id)
943 .await?;
944 match backfill_state {
945 None => {
946 // Newly added split. We don't need to backfill.
947 // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
948 target_state.insert(
949 split_id,
950 BackfillStateWithProgress {
951 state: BackfillState::Finished,
952 num_consumed_rows: 0,
953 target_offset: None,
954 },
955 );
956 }
957 Some(backfill_state) => {
958 // Migrated split. Backfill if unfinished.
959 target_state.insert(split_id, backfill_state);
960 }
961 }
962 }
963 }
964
965 // Checks dropped splits
966 for existing_split_id in old_states.keys() {
967 if !target_state.contains_key(existing_split_id) {
968 tracing::info!("split dropping detected: {}", existing_split_id);
969 split_changed = true;
970 }
971 }
972
973 if split_changed {
974 let dropped_splits = stage
975 .states
976 .extract_if(|split_id, _| !target_state.contains_key(split_id))
977 .map(|(split_id, _)| split_id);
978
979 if should_trim_state {
980 // trim dropped splits' state
981 self.backfill_state_store.trim_state(dropped_splits).await?;
982 }
983 tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
984 } else {
985 debug_assert_eq!(old_states, target_state);
986 }
987 stage.states = target_state;
988 stage.splits = target_splits;
989 stage.debug_assert_consistent();
990 Ok(split_changed)
991 }
992
993 /// For split change during forward stage, all newly added splits should be already finished.
994 // We just need to update the state store if necessary.
995 async fn apply_split_change_forward_stage_after_yield_barrier(
996 &mut self,
997 barrier_epoch: EpochPair,
998 target_splits: Vec<SplitImpl>,
999 states: &mut BackfillStates,
1000 should_trim_state: bool,
1001 ) -> StreamExecutorResult<()> {
1002 self.source_split_change_count.inc();
1003 {
1004 self.update_state_if_changed_forward_stage(
1005 barrier_epoch,
1006 target_splits,
1007 states,
1008 should_trim_state,
1009 )
1010 .await?;
1011 }
1012
1013 Ok(())
1014 }
1015
1016 async fn update_state_if_changed_forward_stage(
1017 &mut self,
1018 barrier_epoch: EpochPair,
1019 target_splits: Vec<SplitImpl>,
1020 states: &mut BackfillStates,
1021 should_trim_state: bool,
1022 ) -> StreamExecutorResult<()> {
1023 let target_splits: HashSet<SplitId> = target_splits
1024 .into_iter()
1025 .map(|split| (split.id()))
1026 .collect();
1027
1028 let mut split_changed = false;
1029 let mut newly_added_splits = vec![];
1030
1031 let committed_reader = self
1032 .backfill_state_store
1033 .new_committed_reader(barrier_epoch)
1034 .await?;
1035
1036 // Checks added splits
1037 for split_id in &target_splits {
1038 if !states.contains_key(split_id) {
1039 split_changed = true;
1040
1041 let backfill_state = committed_reader
1042 .try_recover_from_state_store(split_id)
1043 .await?;
1044 match &backfill_state {
1045 None => {
1046 // Newly added split. We don't need to backfill!
1047 newly_added_splits.push(split_id.clone());
1048 }
1049 Some(backfill_state) => {
1050 // Migrated split. It should also be finished since we are in forwarding stage.
1051 match backfill_state.state {
1052 BackfillState::Finished => {}
1053 _ => {
1054 return Err(anyhow::anyhow!(
1055 "Unexpected backfill state: {:?}",
1056 backfill_state
1057 )
1058 .into());
1059 }
1060 }
1061 }
1062 }
1063 states.insert(
1064 split_id.clone(),
1065 backfill_state.unwrap_or(BackfillStateWithProgress {
1066 state: BackfillState::Finished,
1067 num_consumed_rows: 0,
1068 target_offset: None,
1069 }),
1070 );
1071 }
1072 }
1073
1074 // Checks dropped splits
1075 for existing_split_id in states.keys() {
1076 if !target_splits.contains(existing_split_id) {
1077 tracing::info!("split dropping detected: {}", existing_split_id);
1078 split_changed = true;
1079 }
1080 }
1081
1082 if split_changed {
1083 tracing::info!(
1084 target_splits = ?target_splits,
1085 "apply split change"
1086 );
1087
1088 let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1089
1090 if should_trim_state {
1091 // trim dropped splits' state
1092 self.backfill_state_store
1093 .trim_state(dropped_splits.map(|(k, _v)| k))
1094 .await?;
1095 }
1096
1097 // For migrated splits, and existing splits, we do not need to update
1098 // state store, but only for newly added splits.
1099 self.backfill_state_store
1100 .set_states(
1101 newly_added_splits
1102 .into_iter()
1103 .map(|split_id| {
1104 (
1105 split_id,
1106 BackfillStateWithProgress {
1107 state: BackfillState::Finished,
1108 num_consumed_rows: 0,
1109 target_offset: None,
1110 },
1111 )
1112 })
1113 .collect(),
1114 )
1115 .await?;
1116 }
1117
1118 Ok(())
1119 }
1120}
1121
1122fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1123 let a = a.parse::<i64>().unwrap();
1124 let b = b.parse::<i64>().unwrap();
1125 a.cmp(&b)
1126}
1127
1128impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1129 fn execute(self: Box<Self>) -> BoxedMessageStream {
1130 self.inner.execute(self.input).boxed()
1131 }
1132}
1133
1134impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1136 f.debug_struct("SourceBackfillExecutor")
1137 .field("source_id", &self.source_id)
1138 .field("column_ids", &self.column_ids)
1139 .field("pk_indices", &self.info.pk_indices)
1140 .finish()
1141 }
1142}