risingwave_stream/executor/source/source_backfill_executor.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::{ColumnId, TableId};
25use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
26use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::types::JsonbVal;
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
31use risingwave_connector::source::{
32 BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
33 SplitMetaData,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39
40use super::executor_core::StreamSourceCore;
41use super::source_backfill_state_table::BackfillStateTableHandler;
42use super::{apply_rate_limit, get_split_offset_col_idx};
43use crate::common::rate_limit::limited_chunk_size;
44use crate::executor::UpdateMutation;
45use crate::executor::prelude::*;
46use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
47use crate::task::CreateMviewProgressReporter;
48
49#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
50pub enum BackfillState {
51 /// `None` means not started yet. It's the initial state.
52 /// XXX: perhaps we can also set to low-watermark instead of `None`
53 Backfilling(Option<String>),
54 /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
55 SourceCachingUp(String),
56 Finished,
57}
58pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
59
60/// Only `state` field is the real state for fail-over.
61/// Other fields are for observability (but we still need to persist them).
62#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
63pub struct BackfillStateWithProgress {
64 pub state: BackfillState,
65 pub num_consumed_rows: u64,
66 /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
67 /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
68 /// so that we can finish backfilling even when upstream doesn't emit any data.
69 pub target_offset: Option<String>,
70}
71
72impl BackfillStateWithProgress {
73 pub fn encode_to_json(self) -> JsonbVal {
74 serde_json::to_value(self).unwrap().into()
75 }
76
77 pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
78 serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
79 }
80}
81
82pub struct SourceBackfillExecutor<S: StateStore> {
83 pub inner: SourceBackfillExecutorInner<S>,
84 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
85 pub input: Executor,
86}
87
88pub struct SourceBackfillExecutorInner<S: StateStore> {
89 actor_ctx: ActorContextRef,
90 info: ExecutorInfo,
91
92 /// Streaming source for external
93 source_id: TableId,
94 source_name: String,
95 column_ids: Vec<ColumnId>,
96 source_desc_builder: Option<SourceDescBuilder>,
97 backfill_state_store: BackfillStateTableHandler<S>,
98
99 /// Metrics for monitor.
100 metrics: Arc<StreamingMetrics>,
101 source_split_change_count: LabelGuardedIntCounter,
102
103 // /// Receiver of barrier channel.
104 // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
105 /// System parameter reader to read barrier interval
106 system_params: SystemParamsReaderRef,
107
108 /// Rate limit in rows/s.
109 rate_limit_rps: Option<u32>,
110
111 progress: CreateMviewProgressReporter,
112}
113
114/// Local variables used in the backfill stage.
115///
116/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
117///
118/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
119#[derive(Debug)]
120struct BackfillStage {
121 states: BackfillStates,
122 /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
123 ///
124 /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
125 splits: Vec<SplitImpl>,
126}
127
128enum ApplyMutationAfterBarrier {
129 SourceChangeSplit {
130 target_splits: Vec<SplitImpl>,
131 should_trim_state: bool,
132 },
133 ConnectorPropsChange,
134}
135
136impl BackfillStage {
137 fn total_backfilled_rows(&self) -> u64 {
138 self.states.values().map(|s| s.num_consumed_rows).sum()
139 }
140
141 fn debug_assert_consistent(&self) {
142 if cfg!(debug_assertions) {
143 let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
144 assert_eq!(
145 self.states.keys().cloned().collect::<HashSet<_>>(),
146 all_splits
147 );
148 }
149 }
150
151 /// Get unfinished splits with latest offsets according to the backfill states.
152 fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
153 let mut unfinished_splits = Vec::new();
154 for split in &self.splits {
155 let state = &self.states.get(split.id().as_ref()).unwrap().state;
156 match state {
157 BackfillState::Backfilling(Some(offset)) => {
158 let mut updated_split = split.clone();
159 updated_split.update_in_place(offset.clone())?;
160 unfinished_splits.push(updated_split);
161 }
162 BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
163 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
164 }
165 }
166 Ok(unfinished_splits)
167 }
168
169 /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
170 fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
171 let mut vis = false;
172 let state = self.states.get_mut(split_id).unwrap();
173 let state_inner = &mut state.state;
174 match state_inner {
175 BackfillState::Backfilling(None) => {
176 // backfilling for this split is not started yet. Ignore this row
177 }
178 BackfillState::Backfilling(Some(backfill_offset)) => {
179 match compare_kafka_offset(backfill_offset, offset) {
180 Ordering::Less => {
181 // continue backfilling. Ignore this row
182 }
183 Ordering::Equal => {
184 // backfilling for this split is finished just right.
185 *state_inner = BackfillState::Finished;
186 }
187 Ordering::Greater => {
188 // backfilling for this split produced more data than current source's progress.
189 // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
190 *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
191 }
192 }
193 }
194 BackfillState::SourceCachingUp(backfill_offset) => {
195 match compare_kafka_offset(backfill_offset, offset) {
196 Ordering::Less => {
197 // Source caught up, but doesn't contain the last backfilled row.
198 // This may happen e.g., if Kafka performed compaction.
199 vis = true;
200 *state_inner = BackfillState::Finished;
201 }
202 Ordering::Equal => {
203 // Source just caught up with backfilling.
204 *state_inner = BackfillState::Finished;
205 }
206 Ordering::Greater => {
207 // Source is still behind backfilling.
208 }
209 }
210 }
211 BackfillState::Finished => {
212 vis = true;
213 // This split's backfilling is finished, we are waiting for other splits
214 }
215 }
216 if matches!(state_inner, BackfillState::Backfilling(_)) {
217 state.target_offset = Some(offset.to_owned());
218 }
219 if vis {
220 debug_assert_eq!(*state_inner, BackfillState::Finished);
221 }
222 vis
223 }
224
225 /// Updates backfill states and returns whether the row backfilled from external system is visible.
226 fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
227 let state = self.states.get_mut(split_id).unwrap();
228 state.num_consumed_rows += 1;
229 let state_inner = &mut state.state;
230 match state_inner {
231 BackfillState::Backfilling(_old_offset) => {
232 if let Some(target_offset) = &state.target_offset
233 && compare_kafka_offset(offset, target_offset).is_ge()
234 {
235 // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
236 // and dropping duplicated messages.
237 // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
238 // In this case, upstream hasn't reached the target_offset yet.
239 //
240 // Note2: after this, all following rows in the current chunk will be invisible.
241 //
242 // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
243 // keep backfilling.
244 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
245 } else {
246 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
247 }
248 true
249 }
250 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
251 // backfilling stopped. ignore
252 false
253 }
254 }
255 }
256}
257
258impl<S: StateStore> SourceBackfillExecutorInner<S> {
259 #[expect(clippy::too_many_arguments)]
260 pub fn new(
261 actor_ctx: ActorContextRef,
262 info: ExecutorInfo,
263 stream_source_core: StreamSourceCore<S>,
264 metrics: Arc<StreamingMetrics>,
265 system_params: SystemParamsReaderRef,
266 backfill_state_store: BackfillStateTableHandler<S>,
267 rate_limit_rps: Option<u32>,
268 progress: CreateMviewProgressReporter,
269 ) -> Self {
270 let source_split_change_count = metrics
271 .source_split_change_count
272 .with_guarded_label_values(&[
273 &stream_source_core.source_id.to_string(),
274 &stream_source_core.source_name,
275 &actor_ctx.id.to_string(),
276 &actor_ctx.fragment_id.to_string(),
277 ]);
278
279 Self {
280 actor_ctx,
281 info,
282 source_id: stream_source_core.source_id,
283 source_name: stream_source_core.source_name,
284 column_ids: stream_source_core.column_ids,
285 source_desc_builder: stream_source_core.source_desc_builder,
286 backfill_state_store,
287 metrics,
288 source_split_change_count,
289 system_params,
290 rate_limit_rps,
291 progress,
292 }
293 }
294
295 async fn build_stream_source_reader(
296 &self,
297 source_desc: &SourceDesc,
298 splits: Vec<SplitImpl>,
299 ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
300 let column_ids = source_desc
301 .columns
302 .iter()
303 .map(|column_desc| column_desc.column_id)
304 .collect_vec();
305 let source_ctx = SourceContext::new(
306 self.actor_ctx.id,
307 self.source_id,
308 self.actor_ctx.fragment_id,
309 self.source_name.clone(),
310 source_desc.metrics.clone(),
311 SourceCtrlOpts {
312 chunk_size: limited_chunk_size(self.rate_limit_rps),
313 split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
314 },
315 source_desc.source.config.clone(),
316 None,
317 );
318
319 // We will check watermark to decide whether we need to backfill.
320 // e.g., when there's a Kafka topic-partition without any data,
321 // we don't need to backfill at all. But if we do not check here,
322 // the executor can only know it's finished when data coming in.
323 // For blocking DDL, this would be annoying.
324
325 let (stream, res) = source_desc
326 .source
327 .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
328 .await
329 .map_err(StreamExecutorError::connector_error)?;
330 Ok((
331 apply_rate_limit(stream, self.rate_limit_rps).boxed(),
332 res.backfill_info,
333 ))
334 }
335
336 #[try_stream(ok = Message, error = StreamExecutorError)]
337 async fn execute(mut self, input: Executor) {
338 let mut input = input.execute();
339
340 // Poll the upstream to get the first barrier.
341 let barrier = expect_first_barrier(&mut input).await?;
342 let first_epoch = barrier.epoch;
343 let owned_splits = barrier
344 .initial_split_assignment(self.actor_ctx.id)
345 .unwrap_or(&[])
346 .to_vec();
347
348 let mut pause_control = PauseControl::new();
349 if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
350 pause_control.backfill_pause();
351 }
352 if barrier.is_pause_on_startup() {
353 pause_control.command_pause();
354 }
355 yield Message::Barrier(barrier);
356
357 let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
358 let mut source_desc = source_desc_builder
359 .build()
360 .map_err(StreamExecutorError::connector_error)?;
361 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
362 else {
363 unreachable!("Partition and offset columns must be set.");
364 };
365
366 self.backfill_state_store.init_epoch(first_epoch).await?;
367
368 let mut backfill_states: BackfillStates = HashMap::new();
369 {
370 let committed_reader = self
371 .backfill_state_store
372 .new_committed_reader(first_epoch)
373 .await?;
374 for split in &owned_splits {
375 let split_id = split.id();
376 let backfill_state = committed_reader
377 .try_recover_from_state_store(&split_id)
378 .await?
379 .unwrap_or(BackfillStateWithProgress {
380 state: BackfillState::Backfilling(None),
381 num_consumed_rows: 0,
382 target_offset: None, // init with None
383 });
384 backfill_states.insert(split_id, backfill_state);
385 }
386 }
387 let mut backfill_stage = BackfillStage {
388 states: backfill_states,
389 splits: owned_splits,
390 };
391 backfill_stage.debug_assert_consistent();
392
393 let (source_chunk_reader, backfill_info) = self
394 .build_stream_source_reader(
395 &source_desc,
396 backfill_stage.get_latest_unfinished_splits()?,
397 )
398 .instrument_await("source_build_reader")
399 .await?;
400 for (split_id, info) in &backfill_info {
401 let state = backfill_stage.states.get_mut(split_id).unwrap();
402 match info {
403 BackfillInfo::NoDataToBackfill => {
404 state.state = BackfillState::Finished;
405 }
406 BackfillInfo::HasDataToBackfill { latest_offset } => {
407 // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
408 state.target_offset = Some(latest_offset.clone());
409 }
410 }
411 }
412 tracing::debug!(?backfill_stage, "source backfill started");
413
414 fn select_strategy(_: &mut ()) -> PollNext {
415 futures::stream::PollNext::Left
416 }
417
418 // We choose "preferring upstream" strategy here, because:
419 // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
420 // For chunks from upstream, they are simply dropped, so there's no much overhead.
421 // So possibly this can also affect other running jobs less.
422 // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
423 let mut backfill_stream = select_with_strategy(
424 input.by_ref().map(Either::Left),
425 source_chunk_reader.map(Either::Right),
426 select_strategy,
427 );
428
429 type PausedReader = Option<impl Stream>;
430 let mut paused_reader: PausedReader = None;
431
432 macro_rules! pause_reader {
433 () => {
434 if !pause_control.reader_paused {
435 let (left, right) = backfill_stream.into_inner();
436 backfill_stream = select_with_strategy(
437 left,
438 futures::stream::pending().boxed().map(Either::Right),
439 select_strategy,
440 );
441 // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
442 paused_reader = Some(right);
443 pause_control.reader_paused = true;
444 }
445 };
446 }
447
448 macro_rules! resume_reader {
449 () => {
450 if pause_control.reader_paused {
451 backfill_stream = select_with_strategy(
452 input.by_ref().map(Either::Left),
453 paused_reader
454 .take()
455 .expect("should have paused reader to resume"),
456 select_strategy,
457 );
458 pause_control.reader_paused = false;
459 }
460 };
461 }
462
463 if pause_control.is_paused() {
464 pause_reader!();
465 }
466
467 let state_store = self
468 .backfill_state_store
469 .state_store()
470 .state_store()
471 .clone();
472 let table_id = self.backfill_state_store.state_store().table_id();
473 let mut state_table_initialized = false;
474 {
475 let source_backfill_row_count = self
476 .metrics
477 .source_backfill_row_count
478 .with_guarded_label_values(&[
479 &self.source_id.to_string(),
480 &self.source_name,
481 &self.actor_ctx.id.to_string(),
482 &self.actor_ctx.fragment_id.to_string(),
483 ]);
484
485 // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
486 // milliseconds, considering some other latencies like network and cost in Meta.
487 let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
488 as u128
489 * WAIT_BARRIER_MULTIPLE_TIMES;
490 let mut last_barrier_time = Instant::now();
491
492 // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
493 'backfill_loop: while let Some(either) = backfill_stream.next().await {
494 match either {
495 // Upstream
496 Either::Left(msg) => {
497 let Ok(msg) = msg else {
498 let e = msg.unwrap_err();
499 tracing::error!(
500 error = ?e.as_report(),
501 source_id = %self.source_id,
502 "stream source reader error",
503 );
504 GLOBAL_ERROR_METRICS.user_source_error.report([
505 "SourceReaderError".to_owned(),
506 self.source_id.to_string(),
507 self.source_name.clone(),
508 self.actor_ctx.fragment_id.to_string(),
509 ]);
510
511 let (reader, _backfill_info) = self
512 .build_stream_source_reader(
513 &source_desc,
514 backfill_stage.get_latest_unfinished_splits()?,
515 )
516 .await?;
517
518 backfill_stream = select_with_strategy(
519 input.by_ref().map(Either::Left),
520 reader.map(Either::Right),
521 select_strategy,
522 );
523 continue;
524 };
525 match msg {
526 Message::Barrier(barrier) => {
527 last_barrier_time = Instant::now();
528
529 if pause_control.self_resume() {
530 resume_reader!();
531 }
532
533 let mut maybe_muatation = None;
534 if let Some(ref mutation) = barrier.mutation.as_deref() {
535 match mutation {
536 Mutation::Pause => {
537 // pause_reader should not be invoked consecutively more than once.
538 pause_control.command_pause();
539 pause_reader!();
540 }
541 Mutation::Resume => {
542 // pause_reader.take should not be invoked consecutively more than once.
543 if pause_control.command_resume() {
544 resume_reader!();
545 }
546 }
547 Mutation::StartFragmentBackfill { fragment_ids } => {
548 if fragment_ids.contains(&self.actor_ctx.fragment_id)
549 && pause_control.backfill_resume()
550 {
551 resume_reader!();
552 }
553 }
554 Mutation::SourceChangeSplit(actor_splits) => {
555 tracing::info!(
556 actor_splits = ?actor_splits,
557 "source change split received"
558 );
559 maybe_muatation = actor_splits
560 .get(&self.actor_ctx.id)
561 .cloned()
562 .map(|target_splits| {
563 ApplyMutationAfterBarrier::SourceChangeSplit {
564 target_splits,
565 should_trim_state: true,
566 }
567 });
568 }
569 Mutation::Update(UpdateMutation {
570 actor_splits, ..
571 }) => {
572 maybe_muatation = actor_splits
573 .get(&self.actor_ctx.id)
574 .cloned()
575 .map(|target_splits| {
576 ApplyMutationAfterBarrier::SourceChangeSplit {
577 target_splits,
578 should_trim_state: false,
579 }
580 });
581 }
582 Mutation::ConnectorPropsChange(maybe_mutation) => {
583 if let Some(props_plaintext) =
584 maybe_mutation.get(&self.source_id.as_raw_id())
585 {
586 source_desc
587 .update_reader(props_plaintext.clone())?;
588
589 maybe_muatation = Some(
590 ApplyMutationAfterBarrier::ConnectorPropsChange,
591 );
592 }
593 }
594 Mutation::Throttle(actor_to_apply) => {
595 if let Some(new_rate_limit) =
596 actor_to_apply.get(&self.actor_ctx.id)
597 && *new_rate_limit != self.rate_limit_rps
598 {
599 tracing::info!(
600 "updating rate limit from {:?} to {:?}",
601 self.rate_limit_rps,
602 *new_rate_limit
603 );
604 self.rate_limit_rps = *new_rate_limit;
605 // rebuild reader
606 let (reader, _backfill_info) = self
607 .build_stream_source_reader(
608 &source_desc,
609 backfill_stage
610 .get_latest_unfinished_splits()?,
611 )
612 .await?;
613
614 backfill_stream = select_with_strategy(
615 input.by_ref().map(Either::Left),
616 reader.map(Either::Right),
617 select_strategy,
618 );
619 }
620 }
621 _ => {}
622 }
623 }
624 async fn rebuild_reader_on_split_changed(
625 this: &SourceBackfillExecutorInner<impl StateStore>,
626 backfill_stage: &BackfillStage,
627 source_desc: &SourceDesc,
628 ) -> StreamExecutorResult<BoxSourceChunkStream>
629 {
630 // rebuild backfill_stream
631 // Note: we don't put this part in a method, due to some complex lifetime issues.
632
633 let latest_unfinished_splits =
634 backfill_stage.get_latest_unfinished_splits()?;
635 tracing::info!(
636 "actor {:?} apply source split change to {:?}",
637 this.actor_ctx.id,
638 latest_unfinished_splits
639 );
640
641 // Replace the source reader with a new one of the new state.
642 let (reader, _backfill_info) = this
643 .build_stream_source_reader(
644 source_desc,
645 latest_unfinished_splits,
646 )
647 .await?;
648
649 Ok(reader)
650 }
651
652 self.backfill_state_store
653 .set_states(backfill_stage.states.clone())
654 .await?;
655 self.backfill_state_store.commit(barrier.epoch).await?;
656
657 if self.should_report_finished(&backfill_stage.states) {
658 // drop the backfill kafka consumers
659 backfill_stream = select_with_strategy(
660 input.by_ref().map(Either::Left),
661 futures::stream::pending().boxed().map(Either::Right),
662 select_strategy,
663 );
664
665 self.progress.finish(
666 barrier.epoch,
667 backfill_stage.total_backfilled_rows(),
668 );
669
670 let barrier_epoch = barrier.epoch;
671 let is_checkpoint = barrier.is_checkpoint();
672 // yield barrier after reporting progress
673 yield Message::Barrier(barrier);
674
675 if let Some(to_apply_mutation) = maybe_muatation {
676 self.apply_split_change_after_yield_barrier(
677 barrier_epoch,
678 &mut backfill_stage,
679 to_apply_mutation,
680 )
681 .await?;
682 }
683
684 if !state_table_initialized {
685 if is_checkpoint {
686 // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
687 // We wait for 2nd epoch
688 let epoch = barrier_epoch.prev;
689 tracing::info!("waiting for epoch: {}", epoch);
690 state_store
691 .try_wait_epoch(
692 HummockReadEpoch::Committed(epoch),
693 TryWaitEpochOptions { table_id },
694 )
695 .await?;
696 tracing::info!("finished waiting for epoch: {}", epoch);
697 state_table_initialized = true;
698 }
699 } else {
700 // After we reported finished, we still don't exit the loop.
701 // Because we need to handle split migration.
702 assert!(
703 state_table_initialized,
704 "state table should be initialized before checking backfill finished"
705 );
706 if self.backfill_finished(&backfill_stage.states).await? {
707 tracing::info!("source backfill finished");
708 break 'backfill_loop;
709 }
710 }
711 } else {
712 self.progress.update_for_source_backfill(
713 barrier.epoch,
714 backfill_stage.total_backfilled_rows(),
715 );
716
717 let barrier_epoch = barrier.epoch;
718 // yield barrier after reporting progress
719 yield Message::Barrier(barrier);
720
721 if let Some(to_apply_mutation) = maybe_muatation
722 && self
723 .apply_split_change_after_yield_barrier(
724 barrier_epoch,
725 &mut backfill_stage,
726 to_apply_mutation,
727 )
728 .await?
729 {
730 let reader = rebuild_reader_on_split_changed(
731 &self,
732 &backfill_stage,
733 &source_desc,
734 )
735 .await?;
736
737 backfill_stream = select_with_strategy(
738 input.by_ref().map(Either::Left),
739 reader.map(Either::Right),
740 select_strategy,
741 );
742 }
743 }
744 }
745 Message::Chunk(chunk) => {
746 // We need to iterate over all rows because there might be multiple splits in a chunk.
747 // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
748 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
749
750 for (i, (_, row)) in chunk.rows().enumerate() {
751 let split = row.datum_at(split_idx).unwrap().into_utf8();
752 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
753 let vis = backfill_stage.handle_upstream_row(split, offset);
754 new_vis.set(i, vis);
755 }
756 // emit chunk if vis is not empty. i.e., some splits finished backfilling.
757 let new_vis = new_vis.finish();
758 if new_vis.count_ones() != 0 {
759 let new_chunk = chunk.clone_with_vis(new_vis);
760 yield Message::Chunk(new_chunk);
761 }
762 }
763 Message::Watermark(_) => {
764 // Ignore watermark during backfill.
765 }
766 }
767 }
768 // backfill
769 Either::Right(msg) => {
770 let chunk = msg?;
771
772 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
773 // Pause to let barrier catch up via backpressure of snapshot stream.
774 pause_control.self_pause();
775 pause_reader!();
776
777 // Exceeds the max wait barrier time, the source will be paused.
778 // Currently we can guarantee the
779 // source is not paused since it received stream
780 // chunks.
781 tracing::warn!(
782 "source {} paused, wait barrier for {:?}",
783 self.info.identity,
784 last_barrier_time.elapsed()
785 );
786
787 // Only update `max_wait_barrier_time_ms` to capture
788 // `barrier_interval_ms`
789 // changes here to avoid frequently accessing the shared
790 // `system_params`.
791 max_wait_barrier_time_ms =
792 self.system_params.load().barrier_interval_ms() as u128
793 * WAIT_BARRIER_MULTIPLE_TIMES;
794 }
795 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
796
797 for (i, (_, row)) in chunk.rows().enumerate() {
798 let split_id = row.datum_at(split_idx).unwrap().into_utf8();
799 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
800 let vis = backfill_stage.handle_backfill_row(split_id, offset);
801 new_vis.set(i, vis);
802 }
803
804 let new_vis = new_vis.finish();
805 let card = new_vis.count_ones();
806 if card != 0 {
807 let new_chunk = chunk.clone_with_vis(new_vis);
808 yield Message::Chunk(new_chunk);
809 source_backfill_row_count.inc_by(card as u64);
810 }
811 }
812 }
813 }
814 }
815
816 std::mem::drop(backfill_stream);
817 let mut states = backfill_stage.states;
818 // Make sure `Finished` state is persisted.
819 self.backfill_state_store.set_states(states.clone()).await?;
820
821 // All splits finished backfilling. Now we only forward the source data.
822 #[for_await]
823 for msg in input {
824 let msg = msg?;
825 match msg {
826 Message::Barrier(barrier) => {
827 let mut split_changed = None;
828 if let Some(ref mutation) = barrier.mutation.as_deref() {
829 match mutation {
830 Mutation::Pause | Mutation::Resume => {
831 // We don't need to do anything. Handled by upstream.
832 }
833 Mutation::SourceChangeSplit(actor_splits) => {
834 tracing::info!(
835 actor_splits = ?actor_splits,
836 "source change split received"
837 );
838 split_changed = actor_splits
839 .get(&self.actor_ctx.id)
840 .cloned()
841 .map(|target_splits| (target_splits, &mut states, true));
842 }
843 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
844 split_changed = actor_splits
845 .get(&self.actor_ctx.id)
846 .cloned()
847 .map(|target_splits| (target_splits, &mut states, false));
848 }
849 _ => {}
850 }
851 }
852 self.backfill_state_store.commit(barrier.epoch).await?;
853 let barrier_epoch = barrier.epoch;
854 yield Message::Barrier(barrier);
855
856 if let Some((target_splits, state, should_trim_state)) = split_changed {
857 self.apply_split_change_forward_stage_after_yield_barrier(
858 barrier_epoch,
859 target_splits,
860 state,
861 should_trim_state,
862 )
863 .await?;
864 }
865 }
866 Message::Chunk(chunk) => {
867 yield Message::Chunk(chunk);
868 }
869 Message::Watermark(watermark) => {
870 yield Message::Watermark(watermark);
871 }
872 }
873 }
874 }
875
876 /// When we should call `progress.finish()` to let blocking DDL return.
877 /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
878 ///
879 /// Note: split migration (online scaling) is related with progress tracking.
880 /// - For foreground DDL, scaling is not allowed before progress is finished.
881 /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
882 ///
883 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
884 fn should_report_finished(&self, states: &BackfillStates) -> bool {
885 states.values().all(|state| {
886 matches!(
887 state.state,
888 BackfillState::Finished | BackfillState::SourceCachingUp(_)
889 )
890 })
891 }
892
893 /// All splits entered `Finished` state.
894 ///
895 /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
896 /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
897 /// this actor, we still need to backfill it.
898 ///
899 /// Note: at the beginning, the actor will only read the state written by itself.
900 /// It needs to _wait until it can read all actors' written data_.
901 /// i.e., wait for the second checkpoint has been available.
902 ///
903 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
904 async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
905 Ok(states
906 .values()
907 .all(|state| matches!(state.state, BackfillState::Finished))
908 && self
909 .backfill_state_store
910 .scan_may_stale()
911 .await?
912 .into_iter()
913 .all(|state| matches!(state.state, BackfillState::Finished)))
914 }
915
916 /// For newly added splits, we do not need to backfill and can directly forward from upstream.
917 async fn apply_split_change_after_yield_barrier(
918 &mut self,
919 barrier_epoch: EpochPair,
920 stage: &mut BackfillStage,
921 to_apply_mutation: ApplyMutationAfterBarrier,
922 ) -> StreamExecutorResult<bool> {
923 match to_apply_mutation {
924 ApplyMutationAfterBarrier::SourceChangeSplit {
925 target_splits,
926 should_trim_state,
927 } => {
928 self.source_split_change_count.inc();
929 {
930 if self
931 .update_state_if_changed(
932 barrier_epoch,
933 target_splits,
934 stage,
935 should_trim_state,
936 )
937 .await?
938 {
939 // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
940 Ok(true)
941 } else {
942 Ok(false)
943 }
944 }
945 }
946 ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
947 }
948 }
949
950 /// Returns `true` if split changed. Otherwise `false`.
951 async fn update_state_if_changed(
952 &mut self,
953 barrier_epoch: EpochPair,
954 target_splits: Vec<SplitImpl>,
955 stage: &mut BackfillStage,
956 should_trim_state: bool,
957 ) -> StreamExecutorResult<bool> {
958 let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
959
960 let mut split_changed = false;
961 // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
962 // Will be set to target_state in the end.
963 let old_states = std::mem::take(&mut stage.states);
964 let committed_reader = self
965 .backfill_state_store
966 .new_committed_reader(barrier_epoch)
967 .await?;
968 // Iterate over the target (assigned) splits
969 // - check if any new splits are added
970 // - build target_state
971 for split in &target_splits {
972 let split_id = split.id();
973 if let Some(s) = old_states.get(&split_id) {
974 target_state.insert(split_id, s.clone());
975 } else {
976 split_changed = true;
977
978 let backfill_state = committed_reader
979 .try_recover_from_state_store(&split_id)
980 .await?;
981 match backfill_state {
982 None => {
983 // Newly added split. We don't need to backfill.
984 // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
985 target_state.insert(
986 split_id,
987 BackfillStateWithProgress {
988 state: BackfillState::Finished,
989 num_consumed_rows: 0,
990 target_offset: None,
991 },
992 );
993 }
994 Some(backfill_state) => {
995 // Migrated split. Backfill if unfinished.
996 target_state.insert(split_id, backfill_state);
997 }
998 }
999 }
1000 }
1001
1002 // Checks dropped splits
1003 for existing_split_id in old_states.keys() {
1004 if !target_state.contains_key(existing_split_id) {
1005 tracing::info!("split dropping detected: {}", existing_split_id);
1006 split_changed = true;
1007 }
1008 }
1009
1010 if split_changed {
1011 let dropped_splits = stage
1012 .states
1013 .extract_if(|split_id, _| !target_state.contains_key(split_id))
1014 .map(|(split_id, _)| split_id);
1015
1016 if should_trim_state {
1017 // trim dropped splits' state
1018 self.backfill_state_store.trim_state(dropped_splits).await?;
1019 }
1020 tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1021 } else {
1022 debug_assert_eq!(old_states, target_state);
1023 }
1024 stage.states = target_state;
1025 stage.splits = target_splits;
1026 stage.debug_assert_consistent();
1027 Ok(split_changed)
1028 }
1029
1030 /// For split change during forward stage, all newly added splits should be already finished.
1031 // We just need to update the state store if necessary.
1032 async fn apply_split_change_forward_stage_after_yield_barrier(
1033 &mut self,
1034 barrier_epoch: EpochPair,
1035 target_splits: Vec<SplitImpl>,
1036 states: &mut BackfillStates,
1037 should_trim_state: bool,
1038 ) -> StreamExecutorResult<()> {
1039 self.source_split_change_count.inc();
1040 {
1041 self.update_state_if_changed_forward_stage(
1042 barrier_epoch,
1043 target_splits,
1044 states,
1045 should_trim_state,
1046 )
1047 .await?;
1048 }
1049
1050 Ok(())
1051 }
1052
1053 async fn update_state_if_changed_forward_stage(
1054 &mut self,
1055 barrier_epoch: EpochPair,
1056 target_splits: Vec<SplitImpl>,
1057 states: &mut BackfillStates,
1058 should_trim_state: bool,
1059 ) -> StreamExecutorResult<()> {
1060 let target_splits: HashSet<SplitId> =
1061 target_splits.into_iter().map(|split| split.id()).collect();
1062
1063 let mut split_changed = false;
1064 let mut newly_added_splits = vec![];
1065
1066 let committed_reader = self
1067 .backfill_state_store
1068 .new_committed_reader(barrier_epoch)
1069 .await?;
1070
1071 // Checks added splits
1072 for split_id in &target_splits {
1073 if !states.contains_key(split_id) {
1074 split_changed = true;
1075
1076 let backfill_state = committed_reader
1077 .try_recover_from_state_store(split_id)
1078 .await?;
1079 match &backfill_state {
1080 None => {
1081 // Newly added split. We don't need to backfill!
1082 newly_added_splits.push(split_id.clone());
1083 }
1084 Some(backfill_state) => {
1085 // Migrated split. It should also be finished since we are in forwarding stage.
1086 match backfill_state.state {
1087 BackfillState::Finished => {}
1088 _ => {
1089 return Err(anyhow::anyhow!(
1090 "Unexpected backfill state: {:?}",
1091 backfill_state
1092 )
1093 .into());
1094 }
1095 }
1096 }
1097 }
1098 states.insert(
1099 split_id.clone(),
1100 backfill_state.unwrap_or(BackfillStateWithProgress {
1101 state: BackfillState::Finished,
1102 num_consumed_rows: 0,
1103 target_offset: None,
1104 }),
1105 );
1106 }
1107 }
1108
1109 // Checks dropped splits
1110 for existing_split_id in states.keys() {
1111 if !target_splits.contains(existing_split_id) {
1112 tracing::info!("split dropping detected: {}", existing_split_id);
1113 split_changed = true;
1114 }
1115 }
1116
1117 if split_changed {
1118 tracing::info!(
1119 target_splits = ?target_splits,
1120 "apply split change"
1121 );
1122
1123 let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1124
1125 if should_trim_state {
1126 // trim dropped splits' state
1127 self.backfill_state_store
1128 .trim_state(dropped_splits.map(|(k, _v)| k))
1129 .await?;
1130 }
1131
1132 // For migrated splits, and existing splits, we do not need to update
1133 // state store, but only for newly added splits.
1134 self.backfill_state_store
1135 .set_states(
1136 newly_added_splits
1137 .into_iter()
1138 .map(|split_id| {
1139 (
1140 split_id,
1141 BackfillStateWithProgress {
1142 state: BackfillState::Finished,
1143 num_consumed_rows: 0,
1144 target_offset: None,
1145 },
1146 )
1147 })
1148 .collect(),
1149 )
1150 .await?;
1151 }
1152
1153 Ok(())
1154 }
1155}
1156
1157fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1158 let a = a.parse::<i64>().unwrap();
1159 let b = b.parse::<i64>().unwrap();
1160 a.cmp(&b)
1161}
1162
1163impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1164 fn execute(self: Box<Self>) -> BoxedMessageStream {
1165 self.inner.execute(self.input).boxed()
1166 }
1167}
1168
1169impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1170 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1171 f.debug_struct("SourceBackfillExecutor")
1172 .field("source_id", &self.source_id)
1173 .field("column_ids", &self.column_ids)
1174 .field("stream_key", &self.info.stream_key)
1175 .finish()
1176 }
1177}
1178
1179struct PauseControl {
1180 // Paused due to backfill order control
1181 backfill_paused: bool,
1182 // Paused due to self-pause, e.g. let barrier catch up
1183 self_paused: bool,
1184 // Paused due to Pause command from meta, pause_on_next_bootstrap
1185 command_paused: bool,
1186 // reader paused
1187 reader_paused: bool,
1188}
1189
1190impl PauseControl {
1191 fn new() -> Self {
1192 Self {
1193 backfill_paused: false,
1194 self_paused: false,
1195 command_paused: false,
1196 reader_paused: false,
1197 }
1198 }
1199
1200 fn is_paused(&self) -> bool {
1201 self.backfill_paused || self.command_paused || self.self_paused
1202 }
1203
1204 /// returns whether we need to pause the reader.
1205 fn backfill_pause(&mut self) {
1206 if self.backfill_paused {
1207 tracing::warn!("backfill_pause invoked twice");
1208 }
1209 self.backfill_paused = true;
1210 }
1211
1212 /// returns whether we need to resume the reader.
1213 /// same precedence as command.
1214 fn backfill_resume(&mut self) -> bool {
1215 if !self.backfill_paused {
1216 tracing::warn!("backfill_resume invoked twice");
1217 }
1218 !self.command_paused
1219 }
1220
1221 /// returns whether we need to pause the reader.
1222 fn self_pause(&mut self) {
1223 assert!(
1224 !self.backfill_paused,
1225 "backfill stream should not be read when backfill_pause is set"
1226 );
1227 assert!(
1228 !self.command_paused,
1229 "backfill stream should not be read when command_pause is set"
1230 );
1231 if self.self_paused {
1232 tracing::warn!("self_pause invoked twice");
1233 }
1234 self.self_paused = true;
1235 }
1236
1237 /// returns whether we need to resume the reader.
1238 /// `self_resume` has the lowest precedence,
1239 /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1240 fn self_resume(&mut self) -> bool {
1241 self.self_paused = false;
1242 !(self.backfill_paused || self.command_paused)
1243 }
1244
1245 /// returns whether we need to pause the reader.
1246 fn command_pause(&mut self) {
1247 if self.command_paused {
1248 tracing::warn!("command_pause invoked twice");
1249 }
1250 self.command_paused = true;
1251 }
1252
1253 /// returns whether we need to resume the reader.
1254 /// same precedence as backfill.
1255 fn command_resume(&mut self) -> bool {
1256 if !self.command_paused {
1257 tracing::warn!("command_resume invoked twice");
1258 }
1259 self.command_paused = false;
1260 !self.backfill_paused
1261 }
1262}