risingwave_stream/executor/source/source_backfill_executor.rs
1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::ColumnId;
25use risingwave_common::id::SourceId;
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
27use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::JsonbVal;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::{
33 BackfillInfo, BoxSourceReaderEventStream, SourceContext, SourceCtrlOpts, SourceReaderEvent,
34 SplitId, SplitImpl, SplitMetaData,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::common::ThrottleType;
38use risingwave_storage::store::TryWaitEpochOptions;
39use serde::{Deserialize, Serialize};
40use thiserror_ext::AsReport;
41
42use super::executor_core::StreamSourceCore;
43use super::source_backfill_state_table::BackfillStateTableHandler;
44use super::{apply_rate_limit_to_source_reader_event, get_split_offset_col_idx};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
49use crate::task::CreateMviewProgressReporter;
50
51#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
52pub enum BackfillState {
53 /// `None` means not started yet. It's the initial state.
54 /// XXX: perhaps we can also set to low-watermark instead of `None`
55 Backfilling(Option<String>),
56 /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
57 SourceCachingUp(String),
58 Finished,
59}
60pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
61
62/// Only `state` field is the real state for fail-over.
63/// Other fields are for observability (but we still need to persist them).
64#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
65pub struct BackfillStateWithProgress {
66 pub state: BackfillState,
67 pub num_consumed_rows: u64,
68 /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
69 /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
70 /// so that we can finish backfilling even when upstream doesn't emit any data.
71 pub target_offset: Option<String>,
72}
73
74impl BackfillStateWithProgress {
75 pub fn encode_to_json(self) -> JsonbVal {
76 serde_json::to_value(self).unwrap().into()
77 }
78
79 pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
80 serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
81 }
82}
83
84pub struct SourceBackfillExecutor<S: StateStore> {
85 pub inner: SourceBackfillExecutorInner<S>,
86 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
87 pub input: Executor,
88}
89
90pub struct SourceBackfillExecutorInner<S: StateStore> {
91 actor_ctx: ActorContextRef,
92 info: ExecutorInfo,
93
94 /// Streaming source for external
95 source_id: SourceId,
96 source_name: String,
97 column_ids: Vec<ColumnId>,
98 source_desc_builder: Option<SourceDescBuilder>,
99 backfill_state_store: BackfillStateTableHandler<S>,
100
101 /// Metrics for monitor.
102 metrics: Arc<StreamingMetrics>,
103 source_split_change_count: LabelGuardedIntCounter,
104
105 // /// Receiver of barrier channel.
106 // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
107 /// System parameter reader to read barrier interval
108 system_params: SystemParamsReaderRef,
109
110 /// Rate limit in rows/s.
111 rate_limit_rps: Option<u32>,
112
113 progress: CreateMviewProgressReporter,
114}
115
116/// Local variables used in the backfill stage.
117///
118/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
119///
120/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
121#[derive(Debug)]
122struct BackfillStage {
123 states: BackfillStates,
124 /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
125 ///
126 /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
127 splits: Vec<SplitImpl>,
128}
129
130enum ApplyMutationAfterBarrier {
131 SourceChangeSplit {
132 target_splits: Vec<SplitImpl>,
133 should_trim_state: bool,
134 },
135 ConnectorPropsChange,
136}
137
138impl BackfillStage {
139 fn total_backfilled_rows(&self) -> u64 {
140 self.states.values().map(|s| s.num_consumed_rows).sum()
141 }
142
143 fn debug_assert_consistent(&self) {
144 if cfg!(debug_assertions) {
145 let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
146 assert_eq!(
147 self.states.keys().cloned().collect::<HashSet<_>>(),
148 all_splits
149 );
150 }
151 }
152
153 /// Get unfinished splits with latest offsets according to the backfill states.
154 fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
155 let mut unfinished_splits = Vec::new();
156 for split in &self.splits {
157 let state = &self.states.get(split.id().as_ref()).unwrap().state;
158 match state {
159 BackfillState::Backfilling(Some(offset)) => {
160 let mut updated_split = split.clone();
161 updated_split.update_in_place(offset.clone())?;
162 unfinished_splits.push(updated_split);
163 }
164 BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
165 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
166 }
167 }
168 Ok(unfinished_splits)
169 }
170
171 /// Updates backfill states and `target_offsets` from upstream `SourceExecutor` chunks, and
172 /// returns whether the row is visible.
173 ///
174 /// Note that the online catch-up logic intentionally relies on the upstream chunk stream
175 /// carrying split/offset columns. We do not reconcile live upstream progress from source state
176 /// tables here, because source backfill is designed to advance according to the upstream data
177 /// stream instead of a side channel.
178 fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
179 let mut vis = false;
180 let state = self.states.get_mut(split_id).unwrap();
181 let state_inner = &mut state.state;
182 match state_inner {
183 BackfillState::Backfilling(None) => {
184 // backfilling for this split is not started yet. Ignore this row
185 }
186 BackfillState::Backfilling(Some(backfill_offset)) => {
187 match compare_kafka_offset(backfill_offset, offset) {
188 Ordering::Less => {
189 // continue backfilling. Ignore this row
190 }
191 Ordering::Equal => {
192 // backfilling for this split is finished just right.
193 *state_inner = BackfillState::Finished;
194 }
195 Ordering::Greater => {
196 // backfilling for this split produced more data than current source's progress.
197 // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
198 *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
199 }
200 }
201 }
202 BackfillState::SourceCachingUp(backfill_offset) => {
203 match compare_kafka_offset(backfill_offset, offset) {
204 Ordering::Less => {
205 // Source caught up, but doesn't contain the last backfilled row.
206 // This may happen e.g., if Kafka performed compaction.
207 vis = true;
208 *state_inner = BackfillState::Finished;
209 }
210 Ordering::Equal => {
211 // Source just caught up with backfilling.
212 *state_inner = BackfillState::Finished;
213 }
214 Ordering::Greater => {
215 // Source is still behind backfilling.
216 }
217 }
218 }
219 BackfillState::Finished => {
220 vis = true;
221 // This split's backfilling is finished, we are waiting for other splits
222 }
223 }
224 if matches!(state_inner, BackfillState::Backfilling(_)) {
225 state.target_offset = Some(offset.to_owned());
226 }
227 if vis {
228 debug_assert_eq!(*state_inner, BackfillState::Finished);
229 }
230 vis
231 }
232
233 /// Updates backfill states and returns whether the row backfilled from external system is visible.
234 fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
235 let state = self.states.get_mut(split_id).unwrap();
236 state.num_consumed_rows += 1;
237 let state_inner = &mut state.state;
238 match state_inner {
239 BackfillState::Backfilling(_old_offset) => {
240 if let Some(target_offset) = &state.target_offset
241 && compare_kafka_offset(offset, target_offset).is_ge()
242 {
243 // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
244 // and dropping duplicated messages.
245 // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
246 // In this case, upstream hasn't reached the target_offset yet.
247 //
248 // Note2: after this, all following rows in the current chunk will be invisible.
249 //
250 // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
251 // keep backfilling.
252 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
253 } else {
254 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
255 }
256 true
257 }
258 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
259 // backfilling stopped. ignore
260 false
261 }
262 }
263 }
264
265 /// Updates backfill state with split progress emitted by the backfill reader when there are no
266 /// corresponding visible rows.
267 ///
268 /// This is needed for connectors such as Kafka under `read_committed`, where the reader can
269 /// reach a later offset because of EOF/control records without yielding a data chunk. This
270 /// supplements `handle_backfill_row`, but does not replace the upstream-chunk-based progress
271 /// tracking in `handle_upstream_row`.
272 fn handle_backfill_progress(&mut self, split_id: &str, offset: &str) {
273 let Some(state) = self.states.get_mut(split_id) else {
274 return;
275 };
276 let state_inner = &mut state.state;
277 match state_inner {
278 BackfillState::Backfilling(_old_offset) => {
279 if let Some(target_offset) = &state.target_offset
280 && compare_kafka_offset(offset, target_offset).is_ge()
281 {
282 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
283 } else {
284 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
285 }
286 }
287 BackfillState::SourceCachingUp(backfill_offset) => {
288 if compare_kafka_offset(backfill_offset, offset).is_lt() {
289 *backfill_offset = offset.to_owned();
290 }
291 }
292 BackfillState::Finished => {}
293 }
294 }
295}
296
297impl<S: StateStore> SourceBackfillExecutorInner<S> {
298 #[expect(clippy::too_many_arguments)]
299 pub fn new(
300 actor_ctx: ActorContextRef,
301 info: ExecutorInfo,
302 stream_source_core: StreamSourceCore<S>,
303 metrics: Arc<StreamingMetrics>,
304 system_params: SystemParamsReaderRef,
305 backfill_state_store: BackfillStateTableHandler<S>,
306 rate_limit_rps: Option<u32>,
307 progress: CreateMviewProgressReporter,
308 ) -> Self {
309 let StreamSourceCore {
310 source_id,
311 source_name,
312 column_ids,
313 source_desc_builder,
314 ..
315 } = stream_source_core;
316 let source_split_change_count = metrics
317 .source_split_change_count
318 .with_guarded_label_values(&[
319 &source_id.to_string(),
320 &source_name,
321 &actor_ctx.id.to_string(),
322 &actor_ctx.fragment_id.to_string(),
323 ]);
324
325 Self {
326 actor_ctx,
327 info,
328 source_id,
329 source_name,
330 column_ids,
331 source_desc_builder,
332 backfill_state_store,
333 metrics,
334 source_split_change_count,
335 system_params,
336 rate_limit_rps,
337 progress,
338 }
339 }
340
341 async fn build_stream_source_reader(
342 &self,
343 source_desc: &SourceDesc,
344 splits: Vec<SplitImpl>,
345 ) -> StreamExecutorResult<(BoxSourceReaderEventStream, HashMap<SplitId, BackfillInfo>)> {
346 let column_ids = source_desc
347 .columns
348 .iter()
349 .map(|column_desc| column_desc.column_id)
350 .collect_vec();
351 let source_ctx = SourceContext::new(
352 self.actor_ctx.id,
353 self.source_id,
354 self.actor_ctx.fragment_id,
355 self.source_name.clone(),
356 source_desc.metrics.clone(),
357 SourceCtrlOpts {
358 chunk_size: limited_chunk_size(self.rate_limit_rps),
359 split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
360 },
361 source_desc.source.config.clone(),
362 None,
363 );
364
365 // We will check watermark to decide whether we need to backfill.
366 // e.g., when there's a Kafka topic-partition without any data,
367 // we don't need to backfill at all. But if we do not check here,
368 // the executor can only know it's finished when data coming in.
369 // For blocking DDL, this would be annoying.
370
371 let (stream, res) = source_desc
372 .source
373 .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
374 .await
375 .map_err(StreamExecutorError::connector_error)?;
376 Ok((
377 apply_rate_limit_to_source_reader_event(stream, self.rate_limit_rps).boxed(),
378 res.backfill_info,
379 ))
380 }
381
382 #[try_stream(ok = Message, error = StreamExecutorError)]
383 async fn execute(mut self, input: Executor) {
384 let mut input = input.execute();
385
386 // Poll the upstream to get the first barrier.
387 let barrier = expect_first_barrier(&mut input).await?;
388 let first_epoch = barrier.epoch;
389 let owned_splits = barrier
390 .initial_split_assignment(self.actor_ctx.id)
391 .unwrap_or(&[])
392 .to_vec();
393
394 let mut pause_control = PauseControl::new();
395 if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
396 pause_control.backfill_pause();
397 }
398 if barrier.is_pause_on_startup() {
399 pause_control.command_pause();
400 }
401 yield Message::Barrier(barrier);
402
403 let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
404 let mut source_desc = source_desc_builder
405 .build()
406 .map_err(StreamExecutorError::connector_error)?;
407
408 // source backfill only applies to kafka, so we don't need to get pulsar's `message_id_data_idx`.
409 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
410 else {
411 unreachable!("Partition and offset columns must be set.");
412 };
413
414 self.backfill_state_store.init_epoch(first_epoch).await?;
415
416 let mut backfill_states: BackfillStates = HashMap::new();
417 {
418 let committed_reader = self
419 .backfill_state_store
420 .new_committed_reader(first_epoch)
421 .await?;
422 for split in &owned_splits {
423 let split_id = split.id();
424 let backfill_state = committed_reader
425 .try_recover_from_state_store(&split_id)
426 .await?
427 .unwrap_or(BackfillStateWithProgress {
428 state: BackfillState::Backfilling(None),
429 num_consumed_rows: 0,
430 target_offset: None, // init with None
431 });
432 backfill_states.insert(split_id, backfill_state);
433 }
434 }
435 let mut backfill_stage = BackfillStage {
436 states: backfill_states,
437 splits: owned_splits,
438 };
439 backfill_stage.debug_assert_consistent();
440
441 let (source_chunk_reader, backfill_info) = self
442 .build_stream_source_reader(
443 &source_desc,
444 backfill_stage.get_latest_unfinished_splits()?,
445 )
446 .instrument_await("source_build_reader")
447 .await?;
448 for (split_id, info) in &backfill_info {
449 let state = backfill_stage.states.get_mut(split_id).unwrap();
450 match info {
451 BackfillInfo::NoDataToBackfill => {
452 state.state = BackfillState::Finished;
453 }
454 BackfillInfo::HasDataToBackfill { latest_offset } => {
455 // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
456 state.target_offset = Some(latest_offset.clone());
457 }
458 }
459 }
460 tracing::debug!(?backfill_stage, "source backfill started");
461
462 fn select_strategy(_: &mut ()) -> PollNext {
463 futures::stream::PollNext::Left
464 }
465
466 // We choose "preferring upstream" strategy here, because:
467 // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
468 // For chunks from upstream, they are simply dropped, so there's no much overhead.
469 // So possibly this can also affect other running jobs less.
470 // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
471 let mut backfill_stream = select_with_strategy(
472 input.by_ref().map(Either::Left),
473 source_chunk_reader.map(Either::Right),
474 select_strategy,
475 );
476
477 type PausedReader = Option<impl Stream>;
478 let mut paused_reader: PausedReader = None;
479
480 macro_rules! pause_reader {
481 () => {
482 if !pause_control.reader_paused {
483 let (left, right) = backfill_stream.into_inner();
484 backfill_stream = select_with_strategy(
485 left,
486 futures::stream::pending().boxed().map(Either::Right),
487 select_strategy,
488 );
489 // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
490 paused_reader = Some(right);
491 pause_control.reader_paused = true;
492 }
493 };
494 }
495
496 macro_rules! resume_reader {
497 () => {
498 if pause_control.reader_paused {
499 backfill_stream = select_with_strategy(
500 input.by_ref().map(Either::Left),
501 paused_reader
502 .take()
503 .expect("should have paused reader to resume"),
504 select_strategy,
505 );
506 pause_control.reader_paused = false;
507 }
508 };
509 }
510
511 if pause_control.is_paused() {
512 pause_reader!();
513 }
514
515 let state_store = self
516 .backfill_state_store
517 .state_store()
518 .state_store()
519 .clone();
520 let table_id = self.backfill_state_store.state_store().table_id();
521 let mut state_table_initialized = false;
522 {
523 let source_backfill_row_count = self
524 .metrics
525 .source_backfill_row_count
526 .with_guarded_label_values(&[
527 &self.source_id.to_string(),
528 &self.source_name,
529 &self.actor_ctx.id.to_string(),
530 &self.actor_ctx.fragment_id.to_string(),
531 ]);
532
533 // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
534 // milliseconds, considering some other latencies like network and cost in Meta.
535 let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
536 as u128
537 * WAIT_BARRIER_MULTIPLE_TIMES;
538 let mut last_barrier_time = Instant::now();
539
540 // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
541 'backfill_loop: while let Some(either) = backfill_stream.next().await {
542 match either {
543 // Upstream
544 Either::Left(msg) => {
545 let Ok(msg) = msg else {
546 let e = msg.unwrap_err();
547 tracing::error!(
548 error = ?e.as_report(),
549 source_id = %self.source_id,
550 "stream source reader error",
551 );
552 GLOBAL_ERROR_METRICS.user_source_error.report([
553 "SourceReaderError".to_owned(),
554 self.source_id.to_string(),
555 self.source_name.clone(),
556 self.actor_ctx.fragment_id.to_string(),
557 ]);
558
559 let (reader, _backfill_info) = self
560 .build_stream_source_reader(
561 &source_desc,
562 backfill_stage.get_latest_unfinished_splits()?,
563 )
564 .await?;
565
566 backfill_stream = select_with_strategy(
567 input.by_ref().map(Either::Left),
568 reader.map(Either::Right),
569 select_strategy,
570 );
571 continue;
572 };
573 match msg {
574 Message::Barrier(barrier) => {
575 last_barrier_time = Instant::now();
576
577 if pause_control.self_resume() {
578 resume_reader!();
579 }
580
581 let mut maybe_muatation = None;
582 if let Some(ref mutation) = barrier.mutation.as_deref() {
583 match mutation {
584 Mutation::Pause => {
585 // pause_reader should not be invoked consecutively more than once.
586 pause_control.command_pause();
587 pause_reader!();
588 }
589 Mutation::Resume => {
590 // pause_reader.take should not be invoked consecutively more than once.
591 if pause_control.command_resume() {
592 resume_reader!();
593 }
594 }
595 Mutation::StartFragmentBackfill { fragment_ids } => {
596 if fragment_ids.contains(&self.actor_ctx.fragment_id)
597 && pause_control.backfill_resume()
598 {
599 resume_reader!();
600 }
601 }
602 Mutation::SourceChangeSplit(actor_splits) => {
603 tracing::info!(
604 actor_splits = ?actor_splits,
605 "source change split received"
606 );
607 maybe_muatation = actor_splits
608 .get(&self.actor_ctx.id)
609 .cloned()
610 .map(|target_splits| {
611 ApplyMutationAfterBarrier::SourceChangeSplit {
612 target_splits,
613 should_trim_state: true,
614 }
615 });
616 }
617 Mutation::Update(UpdateMutation {
618 actor_splits, ..
619 }) => {
620 maybe_muatation = actor_splits
621 .get(&self.actor_ctx.id)
622 .cloned()
623 .map(|target_splits| {
624 ApplyMutationAfterBarrier::SourceChangeSplit {
625 target_splits,
626 should_trim_state: false,
627 }
628 });
629 }
630 Mutation::ConnectorPropsChange(maybe_mutation) => {
631 if let Some(props_plaintext) =
632 maybe_mutation.get(&self.source_id.as_raw_id())
633 {
634 source_desc
635 .update_reader(props_plaintext.clone())?;
636
637 maybe_muatation = Some(
638 ApplyMutationAfterBarrier::ConnectorPropsChange,
639 );
640 }
641 }
642 Mutation::Throttle(fragment_to_apply) => {
643 if let Some(entry) =
644 fragment_to_apply.get(&self.actor_ctx.fragment_id)
645 && entry.throttle_type() == ThrottleType::Backfill
646 && entry.rate_limit != self.rate_limit_rps
647 {
648 tracing::info!(
649 "updating rate limit from {:?} to {:?}",
650 self.rate_limit_rps,
651 entry.rate_limit
652 );
653 self.rate_limit_rps = entry.rate_limit;
654 // rebuild reader
655 let (reader, _backfill_info) = self
656 .build_stream_source_reader(
657 &source_desc,
658 backfill_stage
659 .get_latest_unfinished_splits()?,
660 )
661 .await?;
662
663 backfill_stream = select_with_strategy(
664 input.by_ref().map(Either::Left),
665 reader.map(Either::Right),
666 select_strategy,
667 );
668 }
669 }
670 _ => {}
671 }
672 }
673 async fn rebuild_reader_on_split_changed(
674 this: &SourceBackfillExecutorInner<impl StateStore>,
675 backfill_stage: &BackfillStage,
676 source_desc: &SourceDesc,
677 ) -> StreamExecutorResult<BoxSourceReaderEventStream>
678 {
679 // rebuild backfill_stream
680 // Note: we don't put this part in a method, due to some complex lifetime issues.
681
682 let latest_unfinished_splits =
683 backfill_stage.get_latest_unfinished_splits()?;
684 tracing::info!(
685 "actor {:?} apply source split change to {:?}",
686 this.actor_ctx.id,
687 latest_unfinished_splits
688 );
689
690 // Replace the source reader with a new one of the new state.
691 let (reader, _backfill_info) = this
692 .build_stream_source_reader(
693 source_desc,
694 latest_unfinished_splits,
695 )
696 .await?;
697
698 Ok(reader)
699 }
700
701 self.backfill_state_store
702 .set_states(backfill_stage.states.clone())
703 .await?;
704 self.backfill_state_store.commit(barrier.epoch).await?;
705
706 if self.should_report_finished(&backfill_stage.states) {
707 // drop the backfill kafka consumers
708 backfill_stream = select_with_strategy(
709 input.by_ref().map(Either::Left),
710 futures::stream::pending().boxed().map(Either::Right),
711 select_strategy,
712 );
713
714 self.progress.finish(
715 barrier.epoch,
716 backfill_stage.total_backfilled_rows(),
717 );
718
719 let barrier_epoch = barrier.epoch;
720 let is_checkpoint = barrier.is_checkpoint();
721 // yield barrier after reporting progress
722 yield Message::Barrier(barrier);
723
724 if let Some(to_apply_mutation) = maybe_muatation {
725 self.apply_split_change_after_yield_barrier(
726 barrier_epoch,
727 &mut backfill_stage,
728 to_apply_mutation,
729 )
730 .await?;
731 }
732
733 if !state_table_initialized {
734 if is_checkpoint {
735 // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
736 // We wait for 2nd epoch
737 let epoch = barrier_epoch.prev;
738 tracing::info!("waiting for epoch: {}", epoch);
739 state_store
740 .try_wait_epoch(
741 HummockReadEpoch::Committed(epoch),
742 TryWaitEpochOptions { table_id },
743 )
744 .await?;
745 tracing::info!("finished waiting for epoch: {}", epoch);
746 state_table_initialized = true;
747 }
748 } else {
749 // After we reported finished, we still don't exit the loop.
750 // Because we need to handle split migration.
751 assert!(
752 state_table_initialized,
753 "state table should be initialized before checking backfill finished"
754 );
755 if self.backfill_finished(&backfill_stage.states).await? {
756 tracing::info!("source backfill finished");
757 break 'backfill_loop;
758 }
759 }
760 } else {
761 self.progress.update_for_source_backfill(
762 barrier.epoch,
763 backfill_stage.total_backfilled_rows(),
764 );
765
766 let barrier_epoch = barrier.epoch;
767 // yield barrier after reporting progress
768 yield Message::Barrier(barrier);
769
770 if let Some(to_apply_mutation) = maybe_muatation
771 && self
772 .apply_split_change_after_yield_barrier(
773 barrier_epoch,
774 &mut backfill_stage,
775 to_apply_mutation,
776 )
777 .await?
778 {
779 let reader = rebuild_reader_on_split_changed(
780 &self,
781 &backfill_stage,
782 &source_desc,
783 )
784 .await?;
785
786 backfill_stream = select_with_strategy(
787 input.by_ref().map(Either::Left),
788 reader.map(Either::Right),
789 select_strategy,
790 );
791 }
792 }
793 }
794 Message::Chunk(chunk) => {
795 // We need to iterate over all rows because there might be multiple splits in a chunk.
796 // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
797 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
798
799 for (i, (_, row)) in chunk.rows().enumerate() {
800 let split = row.datum_at(split_idx).unwrap().into_utf8();
801 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
802 let vis = backfill_stage.handle_upstream_row(split, offset);
803 new_vis.set(i, vis);
804 }
805 // emit chunk if vis is not empty. i.e., some splits finished backfilling.
806 let new_vis = new_vis.finish();
807 if new_vis.count_ones() != 0 {
808 let new_chunk = chunk.clone_with_vis(new_vis);
809 yield Message::Chunk(new_chunk);
810 }
811 }
812 Message::Watermark(_) => {
813 // Ignore watermark during backfill.
814 }
815 }
816 }
817 // backfill
818 Either::Right(msg) => {
819 let chunk = match msg? {
820 SourceReaderEvent::DataChunk(chunk) => chunk,
821 SourceReaderEvent::SplitProgress(split_progress) => {
822 // `SplitProgress` is consumed only from the backfill reader side.
823 // The upstream side of source backfill still advances based on
824 // normal chunks with split/offset columns.
825 for (split_id, offset) in split_progress {
826 let previous_state =
827 backfill_stage.states.get(&split_id).cloned();
828 backfill_stage
829 .handle_backfill_progress(split_id.as_ref(), &offset);
830 let updated_state =
831 backfill_stage.states.get(&split_id).cloned();
832 if previous_state != updated_state {
833 tracing::info!(
834 actor_id = %self.actor_ctx.id,
835 fragment_id = %self.actor_ctx.fragment_id,
836 source_id = %self.source_id,
837 source_name = self.source_name,
838 split_id = %split_id,
839 applied_offset = %offset,
840 previous_state = ?previous_state,
841 updated_state = ?updated_state,
842 "source backfill executor applied split progress offset"
843 );
844 }
845 }
846 continue;
847 }
848 };
849
850 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
851 // Pause to let barrier catch up via backpressure of snapshot stream.
852 pause_control.self_pause();
853 pause_reader!();
854
855 // Exceeds the max wait barrier time, the source will be paused.
856 // Currently we can guarantee the
857 // source is not paused since it received stream
858 // chunks.
859 tracing::warn!(
860 "source {} paused, wait barrier for {:?}",
861 self.info.identity,
862 last_barrier_time.elapsed()
863 );
864
865 // Only update `max_wait_barrier_time_ms` to capture
866 // `barrier_interval_ms`
867 // changes here to avoid frequently accessing the shared
868 // `system_params`.
869 max_wait_barrier_time_ms =
870 self.system_params.load().barrier_interval_ms() as u128
871 * WAIT_BARRIER_MULTIPLE_TIMES;
872 }
873 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
874
875 for (i, (_, row)) in chunk.rows().enumerate() {
876 let split_id = row.datum_at(split_idx).unwrap().into_utf8();
877 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
878 let vis = backfill_stage.handle_backfill_row(split_id, offset);
879 new_vis.set(i, vis);
880 }
881
882 let new_vis = new_vis.finish();
883 let card = new_vis.count_ones();
884 if card != 0 {
885 let new_chunk = chunk.clone_with_vis(new_vis);
886 yield Message::Chunk(new_chunk);
887 source_backfill_row_count.inc_by(card as u64);
888 }
889 }
890 }
891 }
892 }
893
894 std::mem::drop(backfill_stream);
895 let mut states = backfill_stage.states;
896 // Make sure `Finished` state is persisted.
897 self.backfill_state_store.set_states(states.clone()).await?;
898
899 // All splits finished backfilling. Now we only forward the source data.
900 #[for_await]
901 for msg in input {
902 let msg = msg?;
903 match msg {
904 Message::Barrier(barrier) => {
905 let mut split_changed = None;
906 if let Some(ref mutation) = barrier.mutation.as_deref() {
907 match mutation {
908 Mutation::Pause | Mutation::Resume => {
909 // We don't need to do anything. Handled by upstream.
910 }
911 Mutation::SourceChangeSplit(actor_splits) => {
912 tracing::info!(
913 actor_splits = ?actor_splits,
914 "source change split received"
915 );
916 split_changed = actor_splits
917 .get(&self.actor_ctx.id)
918 .cloned()
919 .map(|target_splits| (target_splits, &mut states, true));
920 }
921 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
922 split_changed = actor_splits
923 .get(&self.actor_ctx.id)
924 .cloned()
925 .map(|target_splits| (target_splits, &mut states, false));
926 }
927 _ => {}
928 }
929 }
930 self.backfill_state_store.commit(barrier.epoch).await?;
931 let barrier_epoch = barrier.epoch;
932 yield Message::Barrier(barrier);
933
934 if let Some((target_splits, state, should_trim_state)) = split_changed {
935 self.apply_split_change_forward_stage_after_yield_barrier(
936 barrier_epoch,
937 target_splits,
938 state,
939 should_trim_state,
940 )
941 .await?;
942 }
943 }
944 Message::Chunk(chunk) => {
945 yield Message::Chunk(chunk);
946 }
947 Message::Watermark(watermark) => {
948 yield Message::Watermark(watermark);
949 }
950 }
951 }
952 }
953
954 /// When we should call `progress.finish()` to let blocking DDL return.
955 /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
956 ///
957 /// Note: split migration (online scaling) is related with progress tracking.
958 /// - For foreground DDL, scaling is not allowed before progress is finished.
959 /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
960 ///
961 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
962 fn should_report_finished(&self, states: &BackfillStates) -> bool {
963 states.values().all(|state| {
964 matches!(
965 state.state,
966 BackfillState::Finished | BackfillState::SourceCachingUp(_)
967 )
968 })
969 }
970
971 /// All splits entered `Finished` state.
972 ///
973 /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
974 /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
975 /// this actor, we still need to backfill it.
976 ///
977 /// Note: at the beginning, the actor will only read the state written by itself.
978 /// It needs to _wait until it can read all actors' written data_.
979 /// i.e., wait for the second checkpoint has been available.
980 ///
981 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
982 async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
983 Ok(states
984 .values()
985 .all(|state| matches!(state.state, BackfillState::Finished))
986 && self
987 .backfill_state_store
988 .scan_may_stale()
989 .await?
990 .into_iter()
991 .all(|state| matches!(state.state, BackfillState::Finished)))
992 }
993
994 /// For newly added splits, we do not need to backfill and can directly forward from upstream.
995 async fn apply_split_change_after_yield_barrier(
996 &mut self,
997 barrier_epoch: EpochPair,
998 stage: &mut BackfillStage,
999 to_apply_mutation: ApplyMutationAfterBarrier,
1000 ) -> StreamExecutorResult<bool> {
1001 match to_apply_mutation {
1002 ApplyMutationAfterBarrier::SourceChangeSplit {
1003 target_splits,
1004 should_trim_state,
1005 } => {
1006 self.source_split_change_count.inc();
1007 {
1008 if self
1009 .update_state_if_changed(
1010 barrier_epoch,
1011 target_splits,
1012 stage,
1013 should_trim_state,
1014 )
1015 .await?
1016 {
1017 // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
1018 Ok(true)
1019 } else {
1020 Ok(false)
1021 }
1022 }
1023 }
1024 ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
1025 }
1026 }
1027
1028 /// Returns `true` if split changed. Otherwise `false`.
1029 async fn update_state_if_changed(
1030 &mut self,
1031 barrier_epoch: EpochPair,
1032 target_splits: Vec<SplitImpl>,
1033 stage: &mut BackfillStage,
1034 should_trim_state: bool,
1035 ) -> StreamExecutorResult<bool> {
1036 let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
1037
1038 let mut split_changed = false;
1039 // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
1040 // Will be set to target_state in the end.
1041 let old_states = std::mem::take(&mut stage.states);
1042 let committed_reader = self
1043 .backfill_state_store
1044 .new_committed_reader(barrier_epoch)
1045 .await?;
1046 // Iterate over the target (assigned) splits
1047 // - check if any new splits are added
1048 // - build target_state
1049 for split in &target_splits {
1050 let split_id = split.id();
1051 if let Some(s) = old_states.get(&split_id) {
1052 target_state.insert(split_id, s.clone());
1053 } else {
1054 split_changed = true;
1055
1056 let backfill_state = committed_reader
1057 .try_recover_from_state_store(&split_id)
1058 .await?;
1059 match backfill_state {
1060 None => {
1061 // Newly added split. We don't need to backfill.
1062 // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
1063 target_state.insert(
1064 split_id,
1065 BackfillStateWithProgress {
1066 state: BackfillState::Finished,
1067 num_consumed_rows: 0,
1068 target_offset: None,
1069 },
1070 );
1071 }
1072 Some(backfill_state) => {
1073 // Migrated split. Backfill if unfinished.
1074 target_state.insert(split_id, backfill_state);
1075 }
1076 }
1077 }
1078 }
1079
1080 // Checks dropped splits
1081 for existing_split_id in old_states.keys() {
1082 if !target_state.contains_key(existing_split_id) {
1083 tracing::info!("split dropping detected: {}", existing_split_id);
1084 split_changed = true;
1085 }
1086 }
1087
1088 if split_changed {
1089 let dropped_splits = stage
1090 .states
1091 .extract_if(|split_id, _| !target_state.contains_key(split_id))
1092 .map(|(split_id, _)| split_id);
1093
1094 if should_trim_state {
1095 // trim dropped splits' state
1096 self.backfill_state_store.trim_state(dropped_splits).await?;
1097 }
1098 tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1099 } else {
1100 debug_assert_eq!(old_states, target_state);
1101 }
1102 stage.states = target_state;
1103 stage.splits = target_splits;
1104 stage.debug_assert_consistent();
1105 Ok(split_changed)
1106 }
1107
1108 /// For split change during forward stage, all newly added splits should be already finished.
1109 // We just need to update the state store if necessary.
1110 async fn apply_split_change_forward_stage_after_yield_barrier(
1111 &mut self,
1112 barrier_epoch: EpochPair,
1113 target_splits: Vec<SplitImpl>,
1114 states: &mut BackfillStates,
1115 should_trim_state: bool,
1116 ) -> StreamExecutorResult<()> {
1117 self.source_split_change_count.inc();
1118 {
1119 self.update_state_if_changed_forward_stage(
1120 barrier_epoch,
1121 target_splits,
1122 states,
1123 should_trim_state,
1124 )
1125 .await?;
1126 }
1127
1128 Ok(())
1129 }
1130
1131 async fn update_state_if_changed_forward_stage(
1132 &mut self,
1133 barrier_epoch: EpochPair,
1134 target_splits: Vec<SplitImpl>,
1135 states: &mut BackfillStates,
1136 should_trim_state: bool,
1137 ) -> StreamExecutorResult<()> {
1138 let target_splits: HashSet<SplitId> =
1139 target_splits.into_iter().map(|split| split.id()).collect();
1140
1141 let mut split_changed = false;
1142 let mut newly_added_splits = vec![];
1143
1144 let committed_reader = self
1145 .backfill_state_store
1146 .new_committed_reader(barrier_epoch)
1147 .await?;
1148
1149 // Checks added splits
1150 for split_id in &target_splits {
1151 if !states.contains_key(split_id) {
1152 split_changed = true;
1153
1154 let backfill_state = committed_reader
1155 .try_recover_from_state_store(split_id)
1156 .await?;
1157 match &backfill_state {
1158 None => {
1159 // Newly added split. We don't need to backfill!
1160 newly_added_splits.push(split_id.clone());
1161 }
1162 Some(backfill_state) => {
1163 // Migrated split. It should also be finished since we are in forwarding stage.
1164 match backfill_state.state {
1165 BackfillState::Finished => {}
1166 _ => {
1167 return Err(anyhow::anyhow!(
1168 "Unexpected backfill state: {:?}",
1169 backfill_state
1170 )
1171 .into());
1172 }
1173 }
1174 }
1175 }
1176 states.insert(
1177 split_id.clone(),
1178 backfill_state.unwrap_or(BackfillStateWithProgress {
1179 state: BackfillState::Finished,
1180 num_consumed_rows: 0,
1181 target_offset: None,
1182 }),
1183 );
1184 }
1185 }
1186
1187 // Checks dropped splits
1188 for existing_split_id in states.keys() {
1189 if !target_splits.contains(existing_split_id) {
1190 tracing::info!("split dropping detected: {}", existing_split_id);
1191 split_changed = true;
1192 }
1193 }
1194
1195 if split_changed {
1196 tracing::info!(
1197 target_splits = ?target_splits,
1198 "apply split change"
1199 );
1200
1201 let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1202
1203 if should_trim_state {
1204 // trim dropped splits' state
1205 self.backfill_state_store
1206 .trim_state(dropped_splits.map(|(k, _v)| k))
1207 .await?;
1208 }
1209
1210 // For migrated splits, and existing splits, we do not need to update
1211 // state store, but only for newly added splits.
1212 self.backfill_state_store
1213 .set_states(
1214 newly_added_splits
1215 .into_iter()
1216 .map(|split_id| {
1217 (
1218 split_id,
1219 BackfillStateWithProgress {
1220 state: BackfillState::Finished,
1221 num_consumed_rows: 0,
1222 target_offset: None,
1223 },
1224 )
1225 })
1226 .collect(),
1227 )
1228 .await?;
1229 }
1230
1231 Ok(())
1232 }
1233}
1234
1235fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1236 let a = a.parse::<i64>().unwrap();
1237 let b = b.parse::<i64>().unwrap();
1238 a.cmp(&b)
1239}
1240
1241impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1242 fn execute(self: Box<Self>) -> BoxedMessageStream {
1243 self.inner.execute(self.input).boxed()
1244 }
1245}
1246
1247impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1248 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1249 f.debug_struct("SourceBackfillExecutor")
1250 .field("source_id", &self.source_id)
1251 .field("column_ids", &self.column_ids)
1252 .field("stream_key", &self.info.stream_key)
1253 .finish()
1254 }
1255}
1256
1257struct PauseControl {
1258 // Paused due to backfill order control
1259 backfill_paused: bool,
1260 // Paused due to self-pause, e.g. let barrier catch up
1261 self_paused: bool,
1262 // Paused due to Pause command from meta, pause_on_next_bootstrap
1263 command_paused: bool,
1264 // reader paused
1265 reader_paused: bool,
1266}
1267
1268impl PauseControl {
1269 fn new() -> Self {
1270 Self {
1271 backfill_paused: false,
1272 self_paused: false,
1273 command_paused: false,
1274 reader_paused: false,
1275 }
1276 }
1277
1278 fn is_paused(&self) -> bool {
1279 self.backfill_paused || self.command_paused || self.self_paused
1280 }
1281
1282 /// returns whether we need to pause the reader.
1283 fn backfill_pause(&mut self) {
1284 if self.backfill_paused {
1285 tracing::warn!("backfill_pause invoked twice");
1286 }
1287 self.backfill_paused = true;
1288 }
1289
1290 /// returns whether we need to resume the reader.
1291 /// same precedence as command.
1292 fn backfill_resume(&mut self) -> bool {
1293 if !self.backfill_paused {
1294 tracing::warn!("backfill_resume invoked twice");
1295 }
1296 !self.command_paused
1297 }
1298
1299 /// returns whether we need to pause the reader.
1300 fn self_pause(&mut self) {
1301 assert!(
1302 !self.backfill_paused,
1303 "backfill stream should not be read when backfill_pause is set"
1304 );
1305 assert!(
1306 !self.command_paused,
1307 "backfill stream should not be read when command_pause is set"
1308 );
1309 if self.self_paused {
1310 tracing::warn!("self_pause invoked twice");
1311 }
1312 self.self_paused = true;
1313 }
1314
1315 /// returns whether we need to resume the reader.
1316 /// `self_resume` has the lowest precedence,
1317 /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1318 fn self_resume(&mut self) -> bool {
1319 self.self_paused = false;
1320 !(self.backfill_paused || self.command_paused)
1321 }
1322
1323 /// returns whether we need to pause the reader.
1324 fn command_pause(&mut self) {
1325 if self.command_paused {
1326 tracing::warn!("command_pause invoked twice");
1327 }
1328 self.command_paused = true;
1329 }
1330
1331 /// returns whether we need to resume the reader.
1332 /// same precedence as backfill.
1333 fn command_resume(&mut self) -> bool {
1334 if !self.command_paused {
1335 tracing::warn!("command_resume invoked twice");
1336 }
1337 self.command_paused = false;
1338 !self.backfill_paused
1339 }
1340}