risingwave_stream/executor/source/source_backfill_executor.rs
1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::time::Instant;
18
19use anyhow::anyhow;
20use either::Either;
21use futures::stream::{PollNext, select_with_strategy};
22use itertools::Itertools;
23use risingwave_common::bitmap::BitmapBuilder;
24use risingwave_common::catalog::ColumnId;
25use risingwave_common::id::SourceId;
26use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedIntCounter};
27use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_common::types::JsonbVal;
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
32use risingwave_connector::source::{
33 BackfillInfo, BoxSourceChunkStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
34 SplitMetaData,
35};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::common::ThrottleType;
38use risingwave_storage::store::TryWaitEpochOptions;
39use serde::{Deserialize, Serialize};
40use thiserror_ext::AsReport;
41
42use super::executor_core::StreamSourceCore;
43use super::source_backfill_state_table::BackfillStateTableHandler;
44use super::{apply_rate_limit, get_split_offset_col_idx};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
49use crate::task::CreateMviewProgressReporter;
50
51#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
52pub enum BackfillState {
53 /// `None` means not started yet. It's the initial state.
54 /// XXX: perhaps we can also set to low-watermark instead of `None`
55 Backfilling(Option<String>),
56 /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
57 SourceCachingUp(String),
58 Finished,
59}
60pub type BackfillStates = HashMap<SplitId, BackfillStateWithProgress>;
61
62/// Only `state` field is the real state for fail-over.
63/// Other fields are for observability (but we still need to persist them).
64#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
65pub struct BackfillStateWithProgress {
66 pub state: BackfillState,
67 pub num_consumed_rows: u64,
68 /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
69 /// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
70 /// so that we can finish backfilling even when upstream doesn't emit any data.
71 pub target_offset: Option<String>,
72}
73
74impl BackfillStateWithProgress {
75 pub fn encode_to_json(self) -> JsonbVal {
76 serde_json::to_value(self).unwrap().into()
77 }
78
79 pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
80 serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
81 }
82}
83
84pub struct SourceBackfillExecutor<S: StateStore> {
85 pub inner: SourceBackfillExecutorInner<S>,
86 /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
87 pub input: Executor,
88}
89
90pub struct SourceBackfillExecutorInner<S: StateStore> {
91 actor_ctx: ActorContextRef,
92 info: ExecutorInfo,
93
94 /// Streaming source for external
95 source_id: SourceId,
96 source_name: String,
97 column_ids: Vec<ColumnId>,
98 source_desc_builder: Option<SourceDescBuilder>,
99 backfill_state_store: BackfillStateTableHandler<S>,
100
101 /// Metrics for monitor.
102 metrics: Arc<StreamingMetrics>,
103 source_split_change_count: LabelGuardedIntCounter,
104
105 // /// Receiver of barrier channel.
106 // barrier_receiver: Option<UnboundedReceiver<Barrier>>,
107 /// System parameter reader to read barrier interval
108 system_params: SystemParamsReaderRef,
109
110 /// Rate limit in rows/s.
111 rate_limit_rps: Option<u32>,
112
113 progress: CreateMviewProgressReporter,
114}
115
116/// Local variables used in the backfill stage.
117///
118/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
119///
120/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
121#[derive(Debug)]
122struct BackfillStage {
123 states: BackfillStates,
124 /// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
125 ///
126 /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
127 splits: Vec<SplitImpl>,
128}
129
130enum ApplyMutationAfterBarrier {
131 SourceChangeSplit {
132 target_splits: Vec<SplitImpl>,
133 should_trim_state: bool,
134 },
135 ConnectorPropsChange,
136}
137
138impl BackfillStage {
139 fn total_backfilled_rows(&self) -> u64 {
140 self.states.values().map(|s| s.num_consumed_rows).sum()
141 }
142
143 fn debug_assert_consistent(&self) {
144 if cfg!(debug_assertions) {
145 let all_splits: HashSet<_> = self.splits.iter().map(|split| split.id()).collect();
146 assert_eq!(
147 self.states.keys().cloned().collect::<HashSet<_>>(),
148 all_splits
149 );
150 }
151 }
152
153 /// Get unfinished splits with latest offsets according to the backfill states.
154 fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
155 let mut unfinished_splits = Vec::new();
156 for split in &self.splits {
157 let state = &self.states.get(split.id().as_ref()).unwrap().state;
158 match state {
159 BackfillState::Backfilling(Some(offset)) => {
160 let mut updated_split = split.clone();
161 updated_split.update_in_place(offset.clone())?;
162 unfinished_splits.push(updated_split);
163 }
164 BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
165 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
166 }
167 }
168 Ok(unfinished_splits)
169 }
170
171 /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
172 fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
173 let mut vis = false;
174 let state = self.states.get_mut(split_id).unwrap();
175 let state_inner = &mut state.state;
176 match state_inner {
177 BackfillState::Backfilling(None) => {
178 // backfilling for this split is not started yet. Ignore this row
179 }
180 BackfillState::Backfilling(Some(backfill_offset)) => {
181 match compare_kafka_offset(backfill_offset, offset) {
182 Ordering::Less => {
183 // continue backfilling. Ignore this row
184 }
185 Ordering::Equal => {
186 // backfilling for this split is finished just right.
187 *state_inner = BackfillState::Finished;
188 }
189 Ordering::Greater => {
190 // backfilling for this split produced more data than current source's progress.
191 // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
192 *state_inner = BackfillState::SourceCachingUp(backfill_offset.clone());
193 }
194 }
195 }
196 BackfillState::SourceCachingUp(backfill_offset) => {
197 match compare_kafka_offset(backfill_offset, offset) {
198 Ordering::Less => {
199 // Source caught up, but doesn't contain the last backfilled row.
200 // This may happen e.g., if Kafka performed compaction.
201 vis = true;
202 *state_inner = BackfillState::Finished;
203 }
204 Ordering::Equal => {
205 // Source just caught up with backfilling.
206 *state_inner = BackfillState::Finished;
207 }
208 Ordering::Greater => {
209 // Source is still behind backfilling.
210 }
211 }
212 }
213 BackfillState::Finished => {
214 vis = true;
215 // This split's backfilling is finished, we are waiting for other splits
216 }
217 }
218 if matches!(state_inner, BackfillState::Backfilling(_)) {
219 state.target_offset = Some(offset.to_owned());
220 }
221 if vis {
222 debug_assert_eq!(*state_inner, BackfillState::Finished);
223 }
224 vis
225 }
226
227 /// Updates backfill states and returns whether the row backfilled from external system is visible.
228 fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
229 let state = self.states.get_mut(split_id).unwrap();
230 state.num_consumed_rows += 1;
231 let state_inner = &mut state.state;
232 match state_inner {
233 BackfillState::Backfilling(_old_offset) => {
234 if let Some(target_offset) = &state.target_offset
235 && compare_kafka_offset(offset, target_offset).is_ge()
236 {
237 // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
238 // and dropping duplicated messages.
239 // But it's not true if target_offset is fetched from other places, like Kafka high watermark.
240 // In this case, upstream hasn't reached the target_offset yet.
241 //
242 // Note2: after this, all following rows in the current chunk will be invisible.
243 //
244 // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
245 // keep backfilling.
246 *state_inner = BackfillState::SourceCachingUp(offset.to_owned());
247 } else {
248 *state_inner = BackfillState::Backfilling(Some(offset.to_owned()));
249 }
250 true
251 }
252 BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
253 // backfilling stopped. ignore
254 false
255 }
256 }
257 }
258}
259
260impl<S: StateStore> SourceBackfillExecutorInner<S> {
261 #[expect(clippy::too_many_arguments)]
262 pub fn new(
263 actor_ctx: ActorContextRef,
264 info: ExecutorInfo,
265 stream_source_core: StreamSourceCore<S>,
266 metrics: Arc<StreamingMetrics>,
267 system_params: SystemParamsReaderRef,
268 backfill_state_store: BackfillStateTableHandler<S>,
269 rate_limit_rps: Option<u32>,
270 progress: CreateMviewProgressReporter,
271 ) -> Self {
272 let source_split_change_count = metrics
273 .source_split_change_count
274 .with_guarded_label_values(&[
275 &stream_source_core.source_id.to_string(),
276 &stream_source_core.source_name,
277 &actor_ctx.id.to_string(),
278 &actor_ctx.fragment_id.to_string(),
279 ]);
280
281 Self {
282 actor_ctx,
283 info,
284 source_id: stream_source_core.source_id,
285 source_name: stream_source_core.source_name,
286 column_ids: stream_source_core.column_ids,
287 source_desc_builder: stream_source_core.source_desc_builder,
288 backfill_state_store,
289 metrics,
290 source_split_change_count,
291 system_params,
292 rate_limit_rps,
293 progress,
294 }
295 }
296
297 async fn build_stream_source_reader(
298 &self,
299 source_desc: &SourceDesc,
300 splits: Vec<SplitImpl>,
301 ) -> StreamExecutorResult<(BoxSourceChunkStream, HashMap<SplitId, BackfillInfo>)> {
302 let column_ids = source_desc
303 .columns
304 .iter()
305 .map(|column_desc| column_desc.column_id)
306 .collect_vec();
307 let source_ctx = SourceContext::new(
308 self.actor_ctx.id,
309 self.source_id,
310 self.actor_ctx.fragment_id,
311 self.source_name.clone(),
312 source_desc.metrics.clone(),
313 SourceCtrlOpts {
314 chunk_size: limited_chunk_size(self.rate_limit_rps),
315 split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
316 },
317 source_desc.source.config.clone(),
318 None,
319 );
320
321 // We will check watermark to decide whether we need to backfill.
322 // e.g., when there's a Kafka topic-partition without any data,
323 // we don't need to backfill at all. But if we do not check here,
324 // the executor can only know it's finished when data coming in.
325 // For blocking DDL, this would be annoying.
326
327 let (stream, res) = source_desc
328 .source
329 .build_stream(Some(splits), column_ids, Arc::new(source_ctx), false)
330 .await
331 .map_err(StreamExecutorError::connector_error)?;
332 Ok((
333 apply_rate_limit(stream, self.rate_limit_rps).boxed(),
334 res.backfill_info,
335 ))
336 }
337
338 #[try_stream(ok = Message, error = StreamExecutorError)]
339 async fn execute(mut self, input: Executor) {
340 let mut input = input.execute();
341
342 // Poll the upstream to get the first barrier.
343 let barrier = expect_first_barrier(&mut input).await?;
344 let first_epoch = barrier.epoch;
345 let owned_splits = barrier
346 .initial_split_assignment(self.actor_ctx.id)
347 .unwrap_or(&[])
348 .to_vec();
349
350 let mut pause_control = PauseControl::new();
351 if barrier.is_backfill_pause_on_startup(self.actor_ctx.fragment_id) {
352 pause_control.backfill_pause();
353 }
354 if barrier.is_pause_on_startup() {
355 pause_control.command_pause();
356 }
357 yield Message::Barrier(barrier);
358
359 let source_desc_builder: SourceDescBuilder = self.source_desc_builder.take().unwrap();
360 let mut source_desc = source_desc_builder
361 .build()
362 .map_err(StreamExecutorError::connector_error)?;
363
364 // source backfill only applies to kafka, so we don't need to get pulsar's `message_id_data_idx`.
365 let (Some(split_idx), Some(offset_idx), _) = get_split_offset_col_idx(&source_desc.columns)
366 else {
367 unreachable!("Partition and offset columns must be set.");
368 };
369
370 self.backfill_state_store.init_epoch(first_epoch).await?;
371
372 let mut backfill_states: BackfillStates = HashMap::new();
373 {
374 let committed_reader = self
375 .backfill_state_store
376 .new_committed_reader(first_epoch)
377 .await?;
378 for split in &owned_splits {
379 let split_id = split.id();
380 let backfill_state = committed_reader
381 .try_recover_from_state_store(&split_id)
382 .await?
383 .unwrap_or(BackfillStateWithProgress {
384 state: BackfillState::Backfilling(None),
385 num_consumed_rows: 0,
386 target_offset: None, // init with None
387 });
388 backfill_states.insert(split_id, backfill_state);
389 }
390 }
391 let mut backfill_stage = BackfillStage {
392 states: backfill_states,
393 splits: owned_splits,
394 };
395 backfill_stage.debug_assert_consistent();
396
397 let (source_chunk_reader, backfill_info) = self
398 .build_stream_source_reader(
399 &source_desc,
400 backfill_stage.get_latest_unfinished_splits()?,
401 )
402 .instrument_await("source_build_reader")
403 .await?;
404 for (split_id, info) in &backfill_info {
405 let state = backfill_stage.states.get_mut(split_id).unwrap();
406 match info {
407 BackfillInfo::NoDataToBackfill => {
408 state.state = BackfillState::Finished;
409 }
410 BackfillInfo::HasDataToBackfill { latest_offset } => {
411 // Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
412 state.target_offset = Some(latest_offset.clone());
413 }
414 }
415 }
416 tracing::debug!(?backfill_stage, "source backfill started");
417
418 fn select_strategy(_: &mut ()) -> PollNext {
419 futures::stream::PollNext::Left
420 }
421
422 // We choose "preferring upstream" strategy here, because:
423 // - When the upstream source's workload is high (i.e., Kafka has new incoming data), it just makes backfilling slower.
424 // For chunks from upstream, they are simply dropped, so there's no much overhead.
425 // So possibly this can also affect other running jobs less.
426 // - When the upstream Source's becomes less busy, SourceBackfill can begin to catch up.
427 let mut backfill_stream = select_with_strategy(
428 input.by_ref().map(Either::Left),
429 source_chunk_reader.map(Either::Right),
430 select_strategy,
431 );
432
433 type PausedReader = Option<impl Stream>;
434 let mut paused_reader: PausedReader = None;
435
436 macro_rules! pause_reader {
437 () => {
438 if !pause_control.reader_paused {
439 let (left, right) = backfill_stream.into_inner();
440 backfill_stream = select_with_strategy(
441 left,
442 futures::stream::pending().boxed().map(Either::Right),
443 select_strategy,
444 );
445 // XXX: do we have to store the original reader? Can we simply rebuild the reader later?
446 paused_reader = Some(right);
447 pause_control.reader_paused = true;
448 }
449 };
450 }
451
452 macro_rules! resume_reader {
453 () => {
454 if pause_control.reader_paused {
455 backfill_stream = select_with_strategy(
456 input.by_ref().map(Either::Left),
457 paused_reader
458 .take()
459 .expect("should have paused reader to resume"),
460 select_strategy,
461 );
462 pause_control.reader_paused = false;
463 }
464 };
465 }
466
467 if pause_control.is_paused() {
468 pause_reader!();
469 }
470
471 let state_store = self
472 .backfill_state_store
473 .state_store()
474 .state_store()
475 .clone();
476 let table_id = self.backfill_state_store.state_store().table_id();
477 let mut state_table_initialized = false;
478 {
479 let source_backfill_row_count = self
480 .metrics
481 .source_backfill_row_count
482 .with_guarded_label_values(&[
483 &self.source_id.to_string(),
484 &self.source_name,
485 &self.actor_ctx.id.to_string(),
486 &self.actor_ctx.fragment_id.to_string(),
487 ]);
488
489 // We allow data to flow for `WAIT_BARRIER_MULTIPLE_TIMES` * `expected_barrier_latency_ms`
490 // milliseconds, considering some other latencies like network and cost in Meta.
491 let mut max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
492 as u128
493 * WAIT_BARRIER_MULTIPLE_TIMES;
494 let mut last_barrier_time = Instant::now();
495
496 // The main logic of the loop is in handle_upstream_row and handle_backfill_row.
497 'backfill_loop: while let Some(either) = backfill_stream.next().await {
498 match either {
499 // Upstream
500 Either::Left(msg) => {
501 let Ok(msg) = msg else {
502 let e = msg.unwrap_err();
503 tracing::error!(
504 error = ?e.as_report(),
505 source_id = %self.source_id,
506 "stream source reader error",
507 );
508 GLOBAL_ERROR_METRICS.user_source_error.report([
509 "SourceReaderError".to_owned(),
510 self.source_id.to_string(),
511 self.source_name.clone(),
512 self.actor_ctx.fragment_id.to_string(),
513 ]);
514
515 let (reader, _backfill_info) = self
516 .build_stream_source_reader(
517 &source_desc,
518 backfill_stage.get_latest_unfinished_splits()?,
519 )
520 .await?;
521
522 backfill_stream = select_with_strategy(
523 input.by_ref().map(Either::Left),
524 reader.map(Either::Right),
525 select_strategy,
526 );
527 continue;
528 };
529 match msg {
530 Message::Barrier(barrier) => {
531 last_barrier_time = Instant::now();
532
533 if pause_control.self_resume() {
534 resume_reader!();
535 }
536
537 let mut maybe_muatation = None;
538 if let Some(ref mutation) = barrier.mutation.as_deref() {
539 match mutation {
540 Mutation::Pause => {
541 // pause_reader should not be invoked consecutively more than once.
542 pause_control.command_pause();
543 pause_reader!();
544 }
545 Mutation::Resume => {
546 // pause_reader.take should not be invoked consecutively more than once.
547 if pause_control.command_resume() {
548 resume_reader!();
549 }
550 }
551 Mutation::StartFragmentBackfill { fragment_ids } => {
552 if fragment_ids.contains(&self.actor_ctx.fragment_id)
553 && pause_control.backfill_resume()
554 {
555 resume_reader!();
556 }
557 }
558 Mutation::SourceChangeSplit(actor_splits) => {
559 tracing::info!(
560 actor_splits = ?actor_splits,
561 "source change split received"
562 );
563 maybe_muatation = actor_splits
564 .get(&self.actor_ctx.id)
565 .cloned()
566 .map(|target_splits| {
567 ApplyMutationAfterBarrier::SourceChangeSplit {
568 target_splits,
569 should_trim_state: true,
570 }
571 });
572 }
573 Mutation::Update(UpdateMutation {
574 actor_splits, ..
575 }) => {
576 maybe_muatation = actor_splits
577 .get(&self.actor_ctx.id)
578 .cloned()
579 .map(|target_splits| {
580 ApplyMutationAfterBarrier::SourceChangeSplit {
581 target_splits,
582 should_trim_state: false,
583 }
584 });
585 }
586 Mutation::ConnectorPropsChange(maybe_mutation) => {
587 if let Some(props_plaintext) =
588 maybe_mutation.get(&self.source_id.as_raw_id())
589 {
590 source_desc
591 .update_reader(props_plaintext.clone())?;
592
593 maybe_muatation = Some(
594 ApplyMutationAfterBarrier::ConnectorPropsChange,
595 );
596 }
597 }
598 Mutation::Throttle(fragment_to_apply) => {
599 if let Some(entry) =
600 fragment_to_apply.get(&self.actor_ctx.fragment_id)
601 && entry.throttle_type() == ThrottleType::Backfill
602 && entry.rate_limit != self.rate_limit_rps
603 {
604 tracing::info!(
605 "updating rate limit from {:?} to {:?}",
606 self.rate_limit_rps,
607 entry.rate_limit
608 );
609 self.rate_limit_rps = entry.rate_limit;
610 // rebuild reader
611 let (reader, _backfill_info) = self
612 .build_stream_source_reader(
613 &source_desc,
614 backfill_stage
615 .get_latest_unfinished_splits()?,
616 )
617 .await?;
618
619 backfill_stream = select_with_strategy(
620 input.by_ref().map(Either::Left),
621 reader.map(Either::Right),
622 select_strategy,
623 );
624 }
625 }
626 _ => {}
627 }
628 }
629 async fn rebuild_reader_on_split_changed(
630 this: &SourceBackfillExecutorInner<impl StateStore>,
631 backfill_stage: &BackfillStage,
632 source_desc: &SourceDesc,
633 ) -> StreamExecutorResult<BoxSourceChunkStream>
634 {
635 // rebuild backfill_stream
636 // Note: we don't put this part in a method, due to some complex lifetime issues.
637
638 let latest_unfinished_splits =
639 backfill_stage.get_latest_unfinished_splits()?;
640 tracing::info!(
641 "actor {:?} apply source split change to {:?}",
642 this.actor_ctx.id,
643 latest_unfinished_splits
644 );
645
646 // Replace the source reader with a new one of the new state.
647 let (reader, _backfill_info) = this
648 .build_stream_source_reader(
649 source_desc,
650 latest_unfinished_splits,
651 )
652 .await?;
653
654 Ok(reader)
655 }
656
657 self.backfill_state_store
658 .set_states(backfill_stage.states.clone())
659 .await?;
660 self.backfill_state_store.commit(barrier.epoch).await?;
661
662 if self.should_report_finished(&backfill_stage.states) {
663 // drop the backfill kafka consumers
664 backfill_stream = select_with_strategy(
665 input.by_ref().map(Either::Left),
666 futures::stream::pending().boxed().map(Either::Right),
667 select_strategy,
668 );
669
670 self.progress.finish(
671 barrier.epoch,
672 backfill_stage.total_backfilled_rows(),
673 );
674
675 let barrier_epoch = barrier.epoch;
676 let is_checkpoint = barrier.is_checkpoint();
677 // yield barrier after reporting progress
678 yield Message::Barrier(barrier);
679
680 if let Some(to_apply_mutation) = maybe_muatation {
681 self.apply_split_change_after_yield_barrier(
682 barrier_epoch,
683 &mut backfill_stage,
684 to_apply_mutation,
685 )
686 .await?;
687 }
688
689 if !state_table_initialized {
690 if is_checkpoint {
691 // This is for self.backfill_finished() to be safe: wait until this actor can read all actors' written data.
692 // We wait for 2nd epoch
693 let epoch = barrier_epoch.prev;
694 tracing::info!("waiting for epoch: {}", epoch);
695 state_store
696 .try_wait_epoch(
697 HummockReadEpoch::Committed(epoch),
698 TryWaitEpochOptions { table_id },
699 )
700 .await?;
701 tracing::info!("finished waiting for epoch: {}", epoch);
702 state_table_initialized = true;
703 }
704 } else {
705 // After we reported finished, we still don't exit the loop.
706 // Because we need to handle split migration.
707 assert!(
708 state_table_initialized,
709 "state table should be initialized before checking backfill finished"
710 );
711 if self.backfill_finished(&backfill_stage.states).await? {
712 tracing::info!("source backfill finished");
713 break 'backfill_loop;
714 }
715 }
716 } else {
717 self.progress.update_for_source_backfill(
718 barrier.epoch,
719 backfill_stage.total_backfilled_rows(),
720 );
721
722 let barrier_epoch = barrier.epoch;
723 // yield barrier after reporting progress
724 yield Message::Barrier(barrier);
725
726 if let Some(to_apply_mutation) = maybe_muatation
727 && self
728 .apply_split_change_after_yield_barrier(
729 barrier_epoch,
730 &mut backfill_stage,
731 to_apply_mutation,
732 )
733 .await?
734 {
735 let reader = rebuild_reader_on_split_changed(
736 &self,
737 &backfill_stage,
738 &source_desc,
739 )
740 .await?;
741
742 backfill_stream = select_with_strategy(
743 input.by_ref().map(Either::Left),
744 reader.map(Either::Right),
745 select_strategy,
746 );
747 }
748 }
749 }
750 Message::Chunk(chunk) => {
751 // We need to iterate over all rows because there might be multiple splits in a chunk.
752 // Note: We assume offset from the source is monotonically increasing for the algorithm to work correctly.
753 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
754
755 for (i, (_, row)) in chunk.rows().enumerate() {
756 let split = row.datum_at(split_idx).unwrap().into_utf8();
757 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
758 let vis = backfill_stage.handle_upstream_row(split, offset);
759 new_vis.set(i, vis);
760 }
761 // emit chunk if vis is not empty. i.e., some splits finished backfilling.
762 let new_vis = new_vis.finish();
763 if new_vis.count_ones() != 0 {
764 let new_chunk = chunk.clone_with_vis(new_vis);
765 yield Message::Chunk(new_chunk);
766 }
767 }
768 Message::Watermark(_) => {
769 // Ignore watermark during backfill.
770 }
771 }
772 }
773 // backfill
774 Either::Right(msg) => {
775 let chunk = msg?;
776
777 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
778 // Pause to let barrier catch up via backpressure of snapshot stream.
779 pause_control.self_pause();
780 pause_reader!();
781
782 // Exceeds the max wait barrier time, the source will be paused.
783 // Currently we can guarantee the
784 // source is not paused since it received stream
785 // chunks.
786 tracing::warn!(
787 "source {} paused, wait barrier for {:?}",
788 self.info.identity,
789 last_barrier_time.elapsed()
790 );
791
792 // Only update `max_wait_barrier_time_ms` to capture
793 // `barrier_interval_ms`
794 // changes here to avoid frequently accessing the shared
795 // `system_params`.
796 max_wait_barrier_time_ms =
797 self.system_params.load().barrier_interval_ms() as u128
798 * WAIT_BARRIER_MULTIPLE_TIMES;
799 }
800 let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());
801
802 for (i, (_, row)) in chunk.rows().enumerate() {
803 let split_id = row.datum_at(split_idx).unwrap().into_utf8();
804 let offset = row.datum_at(offset_idx).unwrap().into_utf8();
805 let vis = backfill_stage.handle_backfill_row(split_id, offset);
806 new_vis.set(i, vis);
807 }
808
809 let new_vis = new_vis.finish();
810 let card = new_vis.count_ones();
811 if card != 0 {
812 let new_chunk = chunk.clone_with_vis(new_vis);
813 yield Message::Chunk(new_chunk);
814 source_backfill_row_count.inc_by(card as u64);
815 }
816 }
817 }
818 }
819 }
820
821 std::mem::drop(backfill_stream);
822 let mut states = backfill_stage.states;
823 // Make sure `Finished` state is persisted.
824 self.backfill_state_store.set_states(states.clone()).await?;
825
826 // All splits finished backfilling. Now we only forward the source data.
827 #[for_await]
828 for msg in input {
829 let msg = msg?;
830 match msg {
831 Message::Barrier(barrier) => {
832 let mut split_changed = None;
833 if let Some(ref mutation) = barrier.mutation.as_deref() {
834 match mutation {
835 Mutation::Pause | Mutation::Resume => {
836 // We don't need to do anything. Handled by upstream.
837 }
838 Mutation::SourceChangeSplit(actor_splits) => {
839 tracing::info!(
840 actor_splits = ?actor_splits,
841 "source change split received"
842 );
843 split_changed = actor_splits
844 .get(&self.actor_ctx.id)
845 .cloned()
846 .map(|target_splits| (target_splits, &mut states, true));
847 }
848 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
849 split_changed = actor_splits
850 .get(&self.actor_ctx.id)
851 .cloned()
852 .map(|target_splits| (target_splits, &mut states, false));
853 }
854 _ => {}
855 }
856 }
857 self.backfill_state_store.commit(barrier.epoch).await?;
858 let barrier_epoch = barrier.epoch;
859 yield Message::Barrier(barrier);
860
861 if let Some((target_splits, state, should_trim_state)) = split_changed {
862 self.apply_split_change_forward_stage_after_yield_barrier(
863 barrier_epoch,
864 target_splits,
865 state,
866 should_trim_state,
867 )
868 .await?;
869 }
870 }
871 Message::Chunk(chunk) => {
872 yield Message::Chunk(chunk);
873 }
874 Message::Watermark(watermark) => {
875 yield Message::Watermark(watermark);
876 }
877 }
878 }
879 }
880
881 /// When we should call `progress.finish()` to let blocking DDL return.
882 /// We report as soon as `SourceCachingUp`. Otherwise the DDL might be blocked forever until upstream messages come.
883 ///
884 /// Note: split migration (online scaling) is related with progress tracking.
885 /// - For foreground DDL, scaling is not allowed before progress is finished.
886 /// - For background DDL, scaling is skipped when progress is not finished, and can be triggered by recreating actors during recovery.
887 ///
888 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
889 fn should_report_finished(&self, states: &BackfillStates) -> bool {
890 states.values().all(|state| {
891 matches!(
892 state.state,
893 BackfillState::Finished | BackfillState::SourceCachingUp(_)
894 )
895 })
896 }
897
898 /// All splits entered `Finished` state.
899 ///
900 /// We check all splits for the source, including other actors' splits here, before going to the forward stage.
901 /// Otherwise if we `break` early, but after rescheduling, an unfinished split is migrated to
902 /// this actor, we still need to backfill it.
903 ///
904 /// Note: at the beginning, the actor will only read the state written by itself.
905 /// It needs to _wait until it can read all actors' written data_.
906 /// i.e., wait for the second checkpoint has been available.
907 ///
908 /// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
909 async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
910 Ok(states
911 .values()
912 .all(|state| matches!(state.state, BackfillState::Finished))
913 && self
914 .backfill_state_store
915 .scan_may_stale()
916 .await?
917 .into_iter()
918 .all(|state| matches!(state.state, BackfillState::Finished)))
919 }
920
921 /// For newly added splits, we do not need to backfill and can directly forward from upstream.
922 async fn apply_split_change_after_yield_barrier(
923 &mut self,
924 barrier_epoch: EpochPair,
925 stage: &mut BackfillStage,
926 to_apply_mutation: ApplyMutationAfterBarrier,
927 ) -> StreamExecutorResult<bool> {
928 match to_apply_mutation {
929 ApplyMutationAfterBarrier::SourceChangeSplit {
930 target_splits,
931 should_trim_state,
932 } => {
933 self.source_split_change_count.inc();
934 {
935 if self
936 .update_state_if_changed(
937 barrier_epoch,
938 target_splits,
939 stage,
940 should_trim_state,
941 )
942 .await?
943 {
944 // Note: we don't rebuild backfill_stream here, due to some complex lifetime issues.
945 Ok(true)
946 } else {
947 Ok(false)
948 }
949 }
950 }
951 ApplyMutationAfterBarrier::ConnectorPropsChange => Ok(true),
952 }
953 }
954
955 /// Returns `true` if split changed. Otherwise `false`.
956 async fn update_state_if_changed(
957 &mut self,
958 barrier_epoch: EpochPair,
959 target_splits: Vec<SplitImpl>,
960 stage: &mut BackfillStage,
961 should_trim_state: bool,
962 ) -> StreamExecutorResult<bool> {
963 let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());
964
965 let mut split_changed = false;
966 // Take out old states (immutable, only used to build target_state and check for added/dropped splits).
967 // Will be set to target_state in the end.
968 let old_states = std::mem::take(&mut stage.states);
969 let committed_reader = self
970 .backfill_state_store
971 .new_committed_reader(barrier_epoch)
972 .await?;
973 // Iterate over the target (assigned) splits
974 // - check if any new splits are added
975 // - build target_state
976 for split in &target_splits {
977 let split_id = split.id();
978 if let Some(s) = old_states.get(&split_id) {
979 target_state.insert(split_id, s.clone());
980 } else {
981 split_changed = true;
982
983 let backfill_state = committed_reader
984 .try_recover_from_state_store(&split_id)
985 .await?;
986 match backfill_state {
987 None => {
988 // Newly added split. We don't need to backfill.
989 // Note that this branch is different from the initial barrier (BackfillStateInner::Backfilling(None) there).
990 target_state.insert(
991 split_id,
992 BackfillStateWithProgress {
993 state: BackfillState::Finished,
994 num_consumed_rows: 0,
995 target_offset: None,
996 },
997 );
998 }
999 Some(backfill_state) => {
1000 // Migrated split. Backfill if unfinished.
1001 target_state.insert(split_id, backfill_state);
1002 }
1003 }
1004 }
1005 }
1006
1007 // Checks dropped splits
1008 for existing_split_id in old_states.keys() {
1009 if !target_state.contains_key(existing_split_id) {
1010 tracing::info!("split dropping detected: {}", existing_split_id);
1011 split_changed = true;
1012 }
1013 }
1014
1015 if split_changed {
1016 let dropped_splits = stage
1017 .states
1018 .extract_if(|split_id, _| !target_state.contains_key(split_id))
1019 .map(|(split_id, _)| split_id);
1020
1021 if should_trim_state {
1022 // trim dropped splits' state
1023 self.backfill_state_store.trim_state(dropped_splits).await?;
1024 }
1025 tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
1026 } else {
1027 debug_assert_eq!(old_states, target_state);
1028 }
1029 stage.states = target_state;
1030 stage.splits = target_splits;
1031 stage.debug_assert_consistent();
1032 Ok(split_changed)
1033 }
1034
1035 /// For split change during forward stage, all newly added splits should be already finished.
1036 // We just need to update the state store if necessary.
1037 async fn apply_split_change_forward_stage_after_yield_barrier(
1038 &mut self,
1039 barrier_epoch: EpochPair,
1040 target_splits: Vec<SplitImpl>,
1041 states: &mut BackfillStates,
1042 should_trim_state: bool,
1043 ) -> StreamExecutorResult<()> {
1044 self.source_split_change_count.inc();
1045 {
1046 self.update_state_if_changed_forward_stage(
1047 barrier_epoch,
1048 target_splits,
1049 states,
1050 should_trim_state,
1051 )
1052 .await?;
1053 }
1054
1055 Ok(())
1056 }
1057
1058 async fn update_state_if_changed_forward_stage(
1059 &mut self,
1060 barrier_epoch: EpochPair,
1061 target_splits: Vec<SplitImpl>,
1062 states: &mut BackfillStates,
1063 should_trim_state: bool,
1064 ) -> StreamExecutorResult<()> {
1065 let target_splits: HashSet<SplitId> =
1066 target_splits.into_iter().map(|split| split.id()).collect();
1067
1068 let mut split_changed = false;
1069 let mut newly_added_splits = vec![];
1070
1071 let committed_reader = self
1072 .backfill_state_store
1073 .new_committed_reader(barrier_epoch)
1074 .await?;
1075
1076 // Checks added splits
1077 for split_id in &target_splits {
1078 if !states.contains_key(split_id) {
1079 split_changed = true;
1080
1081 let backfill_state = committed_reader
1082 .try_recover_from_state_store(split_id)
1083 .await?;
1084 match &backfill_state {
1085 None => {
1086 // Newly added split. We don't need to backfill!
1087 newly_added_splits.push(split_id.clone());
1088 }
1089 Some(backfill_state) => {
1090 // Migrated split. It should also be finished since we are in forwarding stage.
1091 match backfill_state.state {
1092 BackfillState::Finished => {}
1093 _ => {
1094 return Err(anyhow::anyhow!(
1095 "Unexpected backfill state: {:?}",
1096 backfill_state
1097 )
1098 .into());
1099 }
1100 }
1101 }
1102 }
1103 states.insert(
1104 split_id.clone(),
1105 backfill_state.unwrap_or(BackfillStateWithProgress {
1106 state: BackfillState::Finished,
1107 num_consumed_rows: 0,
1108 target_offset: None,
1109 }),
1110 );
1111 }
1112 }
1113
1114 // Checks dropped splits
1115 for existing_split_id in states.keys() {
1116 if !target_splits.contains(existing_split_id) {
1117 tracing::info!("split dropping detected: {}", existing_split_id);
1118 split_changed = true;
1119 }
1120 }
1121
1122 if split_changed {
1123 tracing::info!(
1124 target_splits = ?target_splits,
1125 "apply split change"
1126 );
1127
1128 let dropped_splits = states.extract_if(|split_id, _| !target_splits.contains(split_id));
1129
1130 if should_trim_state {
1131 // trim dropped splits' state
1132 self.backfill_state_store
1133 .trim_state(dropped_splits.map(|(k, _v)| k))
1134 .await?;
1135 }
1136
1137 // For migrated splits, and existing splits, we do not need to update
1138 // state store, but only for newly added splits.
1139 self.backfill_state_store
1140 .set_states(
1141 newly_added_splits
1142 .into_iter()
1143 .map(|split_id| {
1144 (
1145 split_id,
1146 BackfillStateWithProgress {
1147 state: BackfillState::Finished,
1148 num_consumed_rows: 0,
1149 target_offset: None,
1150 },
1151 )
1152 })
1153 .collect(),
1154 )
1155 .await?;
1156 }
1157
1158 Ok(())
1159 }
1160}
1161
1162fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
1163 let a = a.parse::<i64>().unwrap();
1164 let b = b.parse::<i64>().unwrap();
1165 a.cmp(&b)
1166}
1167
1168impl<S: StateStore> Execute for SourceBackfillExecutor<S> {
1169 fn execute(self: Box<Self>) -> BoxedMessageStream {
1170 self.inner.execute(self.input).boxed()
1171 }
1172}
1173
1174impl<S: StateStore> Debug for SourceBackfillExecutorInner<S> {
1175 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1176 f.debug_struct("SourceBackfillExecutor")
1177 .field("source_id", &self.source_id)
1178 .field("column_ids", &self.column_ids)
1179 .field("stream_key", &self.info.stream_key)
1180 .finish()
1181 }
1182}
1183
1184struct PauseControl {
1185 // Paused due to backfill order control
1186 backfill_paused: bool,
1187 // Paused due to self-pause, e.g. let barrier catch up
1188 self_paused: bool,
1189 // Paused due to Pause command from meta, pause_on_next_bootstrap
1190 command_paused: bool,
1191 // reader paused
1192 reader_paused: bool,
1193}
1194
1195impl PauseControl {
1196 fn new() -> Self {
1197 Self {
1198 backfill_paused: false,
1199 self_paused: false,
1200 command_paused: false,
1201 reader_paused: false,
1202 }
1203 }
1204
1205 fn is_paused(&self) -> bool {
1206 self.backfill_paused || self.command_paused || self.self_paused
1207 }
1208
1209 /// returns whether we need to pause the reader.
1210 fn backfill_pause(&mut self) {
1211 if self.backfill_paused {
1212 tracing::warn!("backfill_pause invoked twice");
1213 }
1214 self.backfill_paused = true;
1215 }
1216
1217 /// returns whether we need to resume the reader.
1218 /// same precedence as command.
1219 fn backfill_resume(&mut self) -> bool {
1220 if !self.backfill_paused {
1221 tracing::warn!("backfill_resume invoked twice");
1222 }
1223 !self.command_paused
1224 }
1225
1226 /// returns whether we need to pause the reader.
1227 fn self_pause(&mut self) {
1228 assert!(
1229 !self.backfill_paused,
1230 "backfill stream should not be read when backfill_pause is set"
1231 );
1232 assert!(
1233 !self.command_paused,
1234 "backfill stream should not be read when command_pause is set"
1235 );
1236 if self.self_paused {
1237 tracing::warn!("self_pause invoked twice");
1238 }
1239 self.self_paused = true;
1240 }
1241
1242 /// returns whether we need to resume the reader.
1243 /// `self_resume` has the lowest precedence,
1244 /// it can only resume if we are not paused due to `backfill_paused` or `command_paused`.
1245 fn self_resume(&mut self) -> bool {
1246 self.self_paused = false;
1247 !(self.backfill_paused || self.command_paused)
1248 }
1249
1250 /// returns whether we need to pause the reader.
1251 fn command_pause(&mut self) {
1252 if self.command_paused {
1253 tracing::warn!("command_pause invoked twice");
1254 }
1255 self.command_paused = true;
1256 }
1257
1258 /// returns whether we need to resume the reader.
1259 /// same precedence as backfill.
1260 fn command_resume(&mut self) -> bool {
1261 if !self.command_paused {
1262 tracing::warn!("command_resume invoked twice");
1263 }
1264 self.command_paused = false;
1265 !self.backfill_paused
1266 }
1267}