1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_meta_model::{CreateType, ObjectId};
22use risingwave_pb::ddl_service::DdlProgress;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::stream_service::PbBarrierCompleteResponse;
25use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
26
27use crate::MetaResult;
28use crate::barrier::backfill_order_control::BackfillOrderState;
29use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
30use crate::barrier::{Command, CreateStreamingJobCommandInfo, CreateStreamingJobType};
31use crate::manager::MetadataManager;
32use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
33use crate::stream::{SourceChange, SourceManagerRef};
34
35type ConsumedRows = u64;
36type BufferedRows = u64;
37
38#[derive(Clone, Copy, Debug)]
39enum BackfillState {
40 Init,
41 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
42 Done(ConsumedRows, BufferedRows),
43}
44
45#[derive(Debug, Default)]
47pub(super) struct PendingBackfillFragments {
48 pub next_backfill_nodes: Vec<FragmentId>,
50 pub truncate_locality_provider_state_tables: Vec<TableId>,
52}
53
54#[derive(Debug)]
56pub(super) struct Progress {
57 states: HashMap<ActorId, BackfillState>,
59 backfill_order_state: BackfillOrderState,
60 done_count: usize,
61
62 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
64
65 upstream_mv_count: HashMap<TableId, usize>,
70 upstream_mvs_total_key_count: u64,
72 mv_backfill_consumed_rows: u64,
73 source_backfill_consumed_rows: u64,
74 mv_backfill_buffered_rows: u64,
77
78 definition: String,
80 create_type: CreateType,
82}
83
84impl Progress {
85 fn new(
87 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
88 upstream_mv_count: HashMap<TableId, usize>,
89 upstream_total_key_count: u64,
90 definition: String,
91 create_type: CreateType,
92 backfill_order_state: BackfillOrderState,
93 ) -> Self {
94 let mut states = HashMap::new();
95 let mut backfill_upstream_types = HashMap::new();
96 for (actor, backfill_upstream_type) in actors {
97 states.insert(actor, BackfillState::Init);
98 backfill_upstream_types.insert(actor, backfill_upstream_type);
99 }
100 assert!(!states.is_empty());
101
102 Self {
103 states,
104 backfill_upstream_types,
105 done_count: 0,
106 upstream_mv_count,
107 upstream_mvs_total_key_count: upstream_total_key_count,
108 mv_backfill_consumed_rows: 0,
109 source_backfill_consumed_rows: 0,
110 mv_backfill_buffered_rows: 0,
111 definition,
112 create_type,
113 backfill_order_state,
114 }
115 }
116
117 fn update(
120 &mut self,
121 actor: ActorId,
122 new_state: BackfillState,
123 upstream_total_key_count: u64,
124 ) -> PendingBackfillFragments {
125 let mut result = PendingBackfillFragments::default();
126 self.upstream_mvs_total_key_count = upstream_total_key_count;
127 let total_actors = self.states.len();
128 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
129
130 let mut old_consumed_row = 0;
131 let mut new_consumed_row = 0;
132 let mut old_buffered_row = 0;
133 let mut new_buffered_row = 0;
134 match self.states.remove(&actor).unwrap() {
135 BackfillState::Init => {}
136 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
137 old_consumed_row = consumed_rows;
138 old_buffered_row = buffered_rows;
139 }
140 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
141 };
142 match &new_state {
143 BackfillState::Init => {}
144 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
145 new_consumed_row = *consumed_rows;
146 new_buffered_row = *buffered_rows;
147 }
148 BackfillState::Done(consumed_rows, buffered_rows) => {
149 tracing::debug!("actor {} done", actor);
150 new_consumed_row = *consumed_rows;
151 new_buffered_row = *buffered_rows;
152 self.done_count += 1;
153 let before_backfill_nodes = self
154 .backfill_order_state
155 .current_backfill_node_fragment_ids();
156 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
157 let after_backfill_nodes = self
158 .backfill_order_state
159 .current_backfill_node_fragment_ids();
160 let last_backfill_nodes_iter = before_backfill_nodes
162 .into_iter()
163 .filter(|x| !after_backfill_nodes.contains(x));
164 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
165 .filter_map(|fragment_id| {
166 self.backfill_order_state
167 .get_locality_fragment_state_table_mapping()
168 .get(&fragment_id)
169 })
170 .flatten()
171 .copied()
172 .collect();
173 tracing::debug!(
174 "{} actors out of {} complete",
175 self.done_count,
176 total_actors,
177 );
178 }
179 };
180 debug_assert!(
181 new_consumed_row >= old_consumed_row,
182 "backfill progress should not go backward"
183 );
184 debug_assert!(
185 new_buffered_row >= old_buffered_row,
186 "backfill progress should not go backward"
187 );
188 match backfill_upstream_type {
189 BackfillUpstreamType::MView => {
190 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
191 }
192 BackfillUpstreamType::Source => {
193 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
194 }
195 BackfillUpstreamType::Values => {
196 }
198 BackfillUpstreamType::LocalityProvider => {
199 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
202 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
203 }
204 }
205 self.states.insert(actor, new_state);
206 result
207 }
208
209 fn is_done(&self) -> bool {
211 tracing::trace!(
212 "Progress::is_done? {}, {}, {:?}",
213 self.done_count,
214 self.states.len(),
215 self.states
216 );
217 self.done_count == self.states.len()
218 }
219
220 fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
223 self.states.keys().cloned()
224 }
225
226 fn calculate_progress(&self) -> String {
228 if self.is_done() || self.states.is_empty() {
229 return "100%".to_owned();
230 }
231 let mut mv_count = 0;
232 let mut source_count = 0;
233 for backfill_upstream_type in self.backfill_upstream_types.values() {
234 match backfill_upstream_type {
235 BackfillUpstreamType::MView => mv_count += 1,
236 BackfillUpstreamType::Source => source_count += 1,
237 BackfillUpstreamType::Values => (),
238 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
240 }
241
242 let mv_progress = (mv_count > 0).then_some({
243 let total_rows_to_consume =
246 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
247 if total_rows_to_consume == 0 {
248 "99.99%".to_owned()
249 } else {
250 let mut progress =
251 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
252 if progress > 1.0 {
253 progress = 0.9999;
254 }
255 format!(
256 "{:.2}% ({}/{})",
257 progress * 100.0,
258 self.mv_backfill_consumed_rows,
259 total_rows_to_consume
260 )
261 }
262 });
263 let source_progress = (source_count > 0).then_some(format!(
264 "{} rows consumed",
265 self.source_backfill_consumed_rows
266 ));
267 match (mv_progress, source_progress) {
268 (Some(mv_progress), Some(source_progress)) => {
269 format!(
270 "MView Backfill: {}, Source Backfill: {}",
271 mv_progress, source_progress
272 )
273 }
274 (Some(mv_progress), None) => mv_progress,
275 (None, Some(source_progress)) => source_progress,
276 (None, None) => "Unknown".to_owned(),
277 }
278 }
279}
280
281pub struct TrackingJob {
288 job_id: ObjectId,
289 is_recovered: bool,
290 source_change: Option<SourceChange>,
291}
292
293impl std::fmt::Display for TrackingJob {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 write!(
296 f,
297 "{}{}",
298 self.job_id,
299 if self.is_recovered { "<recovered>" } else { "" }
300 )
301 }
302}
303
304impl TrackingJob {
305 pub(crate) fn new(job_id: ObjectId, source_change: Option<SourceChange>) -> Self {
307 Self {
308 job_id,
309 is_recovered: false,
310 source_change,
311 }
312 }
313
314 pub(crate) fn recovered(job_id: ObjectId, source_change: Option<SourceChange>) -> Self {
316 Self {
317 job_id,
318 is_recovered: true,
319 source_change,
320 }
321 }
322
323 pub(crate) async fn finish(
325 self,
326 metadata_manager: &MetadataManager,
327 source_manager: &SourceManagerRef,
328 ) -> MetaResult<()> {
329 metadata_manager
330 .catalog_controller
331 .finish_streaming_job(self.job_id)
332 .await?;
333 if let Some(source_change) = self.source_change {
334 source_manager.apply_source_change(source_change).await;
335 }
336 Ok(())
337 }
338
339 pub(crate) fn table_to_create(&self) -> TableId {
340 (self.job_id as u32).into()
341 }
342}
343
344impl std::fmt::Debug for TrackingJob {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 if !self.is_recovered {
347 write!(f, "TrackingJob::New({})", self.job_id)
348 } else {
349 write!(f, "TrackingJob::Recovered({})", self.job_id)
350 }
351 }
352}
353
354#[derive(Debug, Default)]
356pub(super) struct StagingCommitInfo {
357 pub finished_jobs: Vec<TrackingJob>,
359 pub table_ids_to_truncate: Vec<TableId>,
361}
362
363pub(super) enum UpdateProgressResult {
364 None,
365 Finished(TrackingJob, PendingBackfillFragments),
367 BackfillNodeFinished(PendingBackfillFragments),
369}
370
371#[derive(Default, Debug)]
377pub(super) struct CreateMviewProgressTracker {
378 progress_map: HashMap<TableId, (Progress, TrackingJob)>,
380
381 actor_map: HashMap<ActorId, TableId>,
382
383 pending_backfill_nodes: Vec<FragmentId>,
385
386 staging_commit_info: StagingCommitInfo,
388}
389
390impl CreateMviewProgressTracker {
391 pub fn recover(
399 jobs: impl IntoIterator<
400 Item = (
401 TableId,
402 (String, &InflightStreamingJobInfo, BackfillOrderState),
403 ),
404 >,
405 version_stats: &HummockVersionStats,
406 ) -> Self {
407 let mut actor_map = HashMap::new();
408 let mut progress_map = HashMap::new();
409 let mut finished_jobs = vec![];
410 for (creating_table_id, (definition, job_info, backfill_order_state)) in jobs {
411 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
412 job_info
413 .fragment_infos
414 .iter()
415 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
416 );
417 let source_change = if source_backfill_fragments.is_empty() {
418 None
419 } else {
420 Some(SourceChange::CreateJobFinished {
421 finished_backfill_fragments: source_backfill_fragments,
422 })
423 };
424 let tracking_job =
425 TrackingJob::recovered(creating_table_id.table_id as _, source_change);
426 let actors = job_info.tracking_progress_actor_ids();
427 if actors.is_empty() {
428 finished_jobs.push(tracking_job);
429 } else {
430 let mut states = HashMap::new();
431 let mut backfill_upstream_types = HashMap::new();
432
433 for (actor, backfill_upstream_type) in actors {
434 actor_map.insert(actor, creating_table_id);
435 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
436 backfill_upstream_types.insert(actor, backfill_upstream_type);
437 }
438
439 let progress = Self::recover_progress(
440 states,
441 backfill_upstream_types,
442 StreamJobFragments::upstream_table_counts_impl(
443 job_info
444 .fragment_infos
445 .values()
446 .map(|fragment| &fragment.nodes),
447 ),
448 definition,
449 version_stats,
450 backfill_order_state,
451 );
452 progress_map.insert(creating_table_id, (progress, tracking_job));
453 }
454 }
455 Self {
456 progress_map,
457 actor_map,
458 pending_backfill_nodes: Vec::new(),
459 staging_commit_info: StagingCommitInfo {
460 finished_jobs,
461 table_ids_to_truncate: vec![],
462 },
463 }
464 }
465
466 fn recover_progress(
472 states: HashMap<ActorId, BackfillState>,
473 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
474 upstream_mv_count: HashMap<TableId, usize>,
475 definition: String,
476 version_stats: &HummockVersionStats,
477 backfill_order_state: BackfillOrderState,
478 ) -> Progress {
479 let upstream_mvs_total_key_count =
480 calculate_total_key_count(&upstream_mv_count, version_stats);
481 Progress {
482 states,
483 backfill_order_state,
484 backfill_upstream_types,
485 done_count: 0, upstream_mv_count,
487 upstream_mvs_total_key_count,
488 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, definition,
492 create_type: CreateType::Background,
493 }
494 }
495
496 pub fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
497 self.progress_map
498 .iter()
499 .map(|(table_id, (x, _))| {
500 let table_id = table_id.table_id;
501 let ddl_progress = DdlProgress {
502 id: table_id as u64,
503 statement: x.definition.clone(),
504 create_type: x.create_type.as_str().to_owned(),
505 progress: x.calculate_progress(),
506 };
507 (table_id, ddl_progress)
508 })
509 .collect()
510 }
511
512 pub(super) fn update_tracking_jobs<'a>(
515 &mut self,
516 info: Option<&CreateStreamingJobCommandInfo>,
517 create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
518 version_stats: &HummockVersionStats,
519 ) {
520 let mut table_ids_to_truncate = vec![];
521 let finished_commands = {
523 let mut commands = vec![];
524 if let Some(create_job_info) = info
526 && let Some(command) = self.add(create_job_info, version_stats)
527 {
528 commands.push(command);
530 }
531 for progress in create_mview_progress {
533 match self.update(progress, version_stats) {
535 UpdateProgressResult::None => {
536 tracing::trace!(?progress, "update progress");
537 }
538 UpdateProgressResult::Finished(command, pending) => {
539 table_ids_to_truncate
540 .extend(pending.truncate_locality_provider_state_tables.clone());
541 self.queue_backfill(pending.next_backfill_nodes);
542 tracing::trace!(?progress, "finish progress");
543 commands.push(command);
544 }
545 UpdateProgressResult::BackfillNodeFinished(pending) => {
546 table_ids_to_truncate
547 .extend(pending.truncate_locality_provider_state_tables.clone());
548 tracing::trace!(
549 ?progress,
550 next_backfill_nodes = ?pending.next_backfill_nodes,
551 "start next backfill node"
552 );
553 self.queue_backfill(pending.next_backfill_nodes);
554 }
555 }
556 }
557 commands
558 };
559
560 for command in finished_commands {
561 self.staging_commit_info.finished_jobs.push(command);
562 }
563 self.staging_commit_info
564 .table_ids_to_truncate
565 .extend(table_ids_to_truncate);
566 }
567
568 pub(super) fn apply_collected_command(
571 &mut self,
572 command: Option<&Command>,
573 barrier_info: &BarrierInfo,
574 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
575 version_stats: &HummockVersionStats,
576 ) -> Option<StagingCommitInfo> {
577 let new_tracking_job_info =
578 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
579 match job_type {
580 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
581 Some(info)
582 }
583 CreateStreamingJobType::SnapshotBackfill(_) => {
584 None
586 }
587 }
588 } else {
589 None
590 };
591 self.update_tracking_jobs(
592 new_tracking_job_info,
593 resps
594 .into_iter()
595 .flat_map(|resp| resp.create_mview_progress.iter()),
596 version_stats,
597 );
598 for table_id in command.map(Command::jobs_to_drop).into_iter().flatten() {
599 self.cancel_command(table_id);
602 }
603 if barrier_info.kind.is_checkpoint() {
604 Some(take(&mut self.staging_commit_info))
605 } else {
606 None
607 }
608 }
609
610 fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
611 self.pending_backfill_nodes.extend(backfill_nodes);
612 }
613
614 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
615 take(&mut self.pending_backfill_nodes)
616 }
617
618 pub(super) fn has_pending_finished_jobs(&self) -> bool {
619 !self.staging_commit_info.finished_jobs.is_empty()
620 }
621
622 pub(super) fn cancel_command(&mut self, id: TableId) {
623 let _ = self.progress_map.remove(&id);
624 self.staging_commit_info
625 .finished_jobs
626 .retain(|x| x.table_to_create() != id);
627 self.actor_map.retain(|_, table_id| *table_id != id);
628 }
629
630 pub fn abort_all(&mut self) {
632 self.actor_map.clear();
633 self.staging_commit_info = StagingCommitInfo::default();
634 self.progress_map.clear();
635 }
636
637 pub fn add(
641 &mut self,
642 info: &CreateStreamingJobCommandInfo,
643 version_stats: &HummockVersionStats,
644 ) -> Option<TrackingJob> {
645 tracing::trace!(?info, "add job to track");
646 let (info, actors) = {
647 let CreateStreamingJobCommandInfo {
648 stream_job_fragments,
649 ..
650 } = info;
651 let actors = stream_job_fragments.tracking_progress_actor_ids();
652 if actors.is_empty() {
653 return Some(TrackingJob::new(
655 info.stream_job_fragments.stream_job_id.table_id as _,
656 Some(SourceChange::CreateJobFinished {
657 finished_backfill_fragments: stream_job_fragments
658 .source_backfill_fragments(),
659 }),
660 ));
661 }
662 (info.clone(), actors)
663 };
664
665 let CreateStreamingJobCommandInfo {
666 stream_job_fragments: table_fragments,
667 definition,
668 create_type,
669 fragment_backfill_ordering,
670 locality_fragment_state_table_mapping,
671 ..
672 } = info;
673
674 let creating_job_id = table_fragments.stream_job_id();
675 let upstream_mv_count = table_fragments.upstream_table_counts();
676 let upstream_total_key_count: u64 =
677 calculate_total_key_count(&upstream_mv_count, version_stats);
678
679 for (actor, _backfill_upstream_type) in &actors {
680 self.actor_map.insert(*actor, creating_job_id);
681 }
682
683 let backfill_order_state = BackfillOrderState::new(
684 fragment_backfill_ordering,
685 &table_fragments,
686 locality_fragment_state_table_mapping,
687 );
688 let progress = Progress::new(
689 actors,
690 upstream_mv_count,
691 upstream_total_key_count,
692 definition,
693 create_type.into(),
694 backfill_order_state,
695 );
696 let old = self.progress_map.insert(
697 creating_job_id,
698 (
699 progress,
700 TrackingJob::new(
701 creating_job_id.table_id as _,
702 Some(SourceChange::CreateJobFinished {
703 finished_backfill_fragments: table_fragments.source_backfill_fragments(),
704 }),
705 ),
706 ),
707 );
708 assert!(old.is_none());
709 None
710 }
711
712 pub fn update(
716 &mut self,
717 progress: &CreateMviewProgress,
718 version_stats: &HummockVersionStats,
719 ) -> UpdateProgressResult {
720 tracing::trace!(?progress, "update progress");
721 let actor = progress.backfill_actor_id;
722 let Some(table_id) = self.actor_map.get(&actor).copied() else {
723 tracing::info!(
730 "no tracked progress for actor {}, the stream job could already be finished",
731 actor
732 );
733 return UpdateProgressResult::None;
734 };
735
736 let new_state = if progress.done {
737 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
738 } else {
739 BackfillState::ConsumingUpstream(
740 progress.consumed_epoch.into(),
741 progress.consumed_rows,
742 progress.buffered_rows,
743 )
744 };
745
746 match self.progress_map.entry(table_id) {
747 Entry::Occupied(mut o) => {
748 let progress_state = &mut o.get_mut().0;
749
750 let upstream_total_key_count: u64 =
751 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
752
753 tracing::debug!(?table_id, "updating progress for table");
754 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
755
756 if progress_state.is_done() {
757 tracing::debug!(
758 "all actors done for creating mview with table_id {}!",
759 table_id
760 );
761
762 for actor in o.get().0.actors() {
764 self.actor_map.remove(&actor);
765 }
766 assert!(pending.next_backfill_nodes.is_empty());
767 UpdateProgressResult::Finished(o.remove().1, pending)
768 } else if !pending.next_backfill_nodes.is_empty()
769 || !pending.truncate_locality_provider_state_tables.is_empty()
770 {
771 UpdateProgressResult::BackfillNodeFinished(pending)
772 } else {
773 UpdateProgressResult::None
774 }
775 }
776 Entry::Vacant(_) => {
777 tracing::warn!(
778 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
779 );
780 UpdateProgressResult::None
781 }
782 }
783 }
784}
785
786fn calculate_total_key_count(
787 table_count: &HashMap<TableId, usize>,
788 version_stats: &HummockVersionStats,
789) -> u64 {
790 table_count
791 .iter()
792 .map(|(table_id, count)| {
793 assert_ne!(*count, 0);
794 *count as u64
795 * version_stats
796 .table_stats
797 .get(&table_id.table_id)
798 .map_or(0, |stat| stat.total_key_count as u64)
799 })
800 .sum()
801}