1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::CreateType;
23use risingwave_pb::ddl_service::DdlProgress;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::PbBarrierCompleteResponse;
26use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
27
28use crate::MetaResult;
29use crate::barrier::backfill_order_control::BackfillOrderState;
30use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
31use crate::barrier::{Command, CreateStreamingJobCommandInfo, CreateStreamingJobType};
32use crate::manager::MetadataManager;
33use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
34use crate::stream::{SourceChange, SourceManagerRef};
35
36type ConsumedRows = u64;
37type BufferedRows = u64;
38
39#[derive(Clone, Copy, Debug)]
40enum BackfillState {
41 Init,
42 ConsumingUpstream(#[expect(dead_code)] Epoch, ConsumedRows, BufferedRows),
43 Done(ConsumedRows, BufferedRows),
44}
45
46#[derive(Debug, Default)]
48pub(super) struct PendingBackfillFragments {
49 pub next_backfill_nodes: Vec<FragmentId>,
51 pub truncate_locality_provider_state_tables: Vec<TableId>,
53}
54
55#[derive(Debug)]
57pub(super) struct Progress {
58 states: HashMap<ActorId, BackfillState>,
60 backfill_order_state: BackfillOrderState,
61 done_count: usize,
62
63 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
65
66 upstream_mv_count: HashMap<TableId, usize>,
71 upstream_mvs_total_key_count: u64,
73 mv_backfill_consumed_rows: u64,
74 source_backfill_consumed_rows: u64,
75 mv_backfill_buffered_rows: u64,
78
79 definition: String,
81 create_type: CreateType,
83}
84
85impl Progress {
86 fn new(
88 actors: impl IntoIterator<Item = (ActorId, BackfillUpstreamType)>,
89 upstream_mv_count: HashMap<TableId, usize>,
90 upstream_total_key_count: u64,
91 definition: String,
92 create_type: CreateType,
93 backfill_order_state: BackfillOrderState,
94 ) -> Self {
95 let mut states = HashMap::new();
96 let mut backfill_upstream_types = HashMap::new();
97 for (actor, backfill_upstream_type) in actors {
98 states.insert(actor, BackfillState::Init);
99 backfill_upstream_types.insert(actor, backfill_upstream_type);
100 }
101 assert!(!states.is_empty());
102
103 Self {
104 states,
105 backfill_upstream_types,
106 done_count: 0,
107 upstream_mv_count,
108 upstream_mvs_total_key_count: upstream_total_key_count,
109 mv_backfill_consumed_rows: 0,
110 source_backfill_consumed_rows: 0,
111 mv_backfill_buffered_rows: 0,
112 definition,
113 create_type,
114 backfill_order_state,
115 }
116 }
117
118 fn update(
121 &mut self,
122 actor: ActorId,
123 new_state: BackfillState,
124 upstream_total_key_count: u64,
125 ) -> PendingBackfillFragments {
126 let mut result = PendingBackfillFragments::default();
127 self.upstream_mvs_total_key_count = upstream_total_key_count;
128 let total_actors = self.states.len();
129 let backfill_upstream_type = self.backfill_upstream_types.get(&actor).unwrap();
130
131 let mut old_consumed_row = 0;
132 let mut new_consumed_row = 0;
133 let mut old_buffered_row = 0;
134 let mut new_buffered_row = 0;
135 match self.states.remove(&actor).unwrap() {
136 BackfillState::Init => {}
137 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
138 old_consumed_row = consumed_rows;
139 old_buffered_row = buffered_rows;
140 }
141 BackfillState::Done(_, _) => panic!("should not report done multiple times"),
142 };
143 match &new_state {
144 BackfillState::Init => {}
145 BackfillState::ConsumingUpstream(_, consumed_rows, buffered_rows) => {
146 new_consumed_row = *consumed_rows;
147 new_buffered_row = *buffered_rows;
148 }
149 BackfillState::Done(consumed_rows, buffered_rows) => {
150 tracing::debug!("actor {} done", actor);
151 new_consumed_row = *consumed_rows;
152 new_buffered_row = *buffered_rows;
153 self.done_count += 1;
154 let before_backfill_nodes = self
155 .backfill_order_state
156 .current_backfill_node_fragment_ids();
157 result.next_backfill_nodes = self.backfill_order_state.finish_actor(actor);
158 let after_backfill_nodes = self
159 .backfill_order_state
160 .current_backfill_node_fragment_ids();
161 let last_backfill_nodes_iter = before_backfill_nodes
163 .into_iter()
164 .filter(|x| !after_backfill_nodes.contains(x));
165 result.truncate_locality_provider_state_tables = last_backfill_nodes_iter
166 .filter_map(|fragment_id| {
167 self.backfill_order_state
168 .get_locality_fragment_state_table_mapping()
169 .get(&fragment_id)
170 })
171 .flatten()
172 .copied()
173 .collect();
174 tracing::debug!(
175 "{} actors out of {} complete",
176 self.done_count,
177 total_actors,
178 );
179 }
180 };
181 debug_assert!(
182 new_consumed_row >= old_consumed_row,
183 "backfill progress should not go backward"
184 );
185 debug_assert!(
186 new_buffered_row >= old_buffered_row,
187 "backfill progress should not go backward"
188 );
189 match backfill_upstream_type {
190 BackfillUpstreamType::MView => {
191 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
192 }
193 BackfillUpstreamType::Source => {
194 self.source_backfill_consumed_rows += new_consumed_row - old_consumed_row;
195 }
196 BackfillUpstreamType::Values => {
197 }
199 BackfillUpstreamType::LocalityProvider => {
200 self.mv_backfill_consumed_rows += new_consumed_row - old_consumed_row;
203 self.mv_backfill_buffered_rows += new_buffered_row - old_buffered_row;
204 }
205 }
206 self.states.insert(actor, new_state);
207 result
208 }
209
210 fn is_done(&self) -> bool {
212 tracing::trace!(
213 "Progress::is_done? {}, {}, {:?}",
214 self.done_count,
215 self.states.len(),
216 self.states
217 );
218 self.done_count == self.states.len()
219 }
220
221 fn actors(&self) -> impl Iterator<Item = ActorId> + '_ {
224 self.states.keys().cloned()
225 }
226
227 fn calculate_progress(&self) -> String {
229 if self.is_done() || self.states.is_empty() {
230 return "100%".to_owned();
231 }
232 let mut mv_count = 0;
233 let mut source_count = 0;
234 for backfill_upstream_type in self.backfill_upstream_types.values() {
235 match backfill_upstream_type {
236 BackfillUpstreamType::MView => mv_count += 1,
237 BackfillUpstreamType::Source => source_count += 1,
238 BackfillUpstreamType::Values => (),
239 BackfillUpstreamType::LocalityProvider => mv_count += 1, }
241 }
242
243 let mv_progress = (mv_count > 0).then_some({
244 let total_rows_to_consume =
247 self.upstream_mvs_total_key_count + self.mv_backfill_buffered_rows;
248 if total_rows_to_consume == 0 {
249 "99.99%".to_owned()
250 } else {
251 let mut progress =
252 self.mv_backfill_consumed_rows as f64 / (total_rows_to_consume as f64);
253 if progress > 1.0 {
254 progress = 0.9999;
255 }
256 format!(
257 "{:.2}% ({}/{})",
258 progress * 100.0,
259 self.mv_backfill_consumed_rows,
260 total_rows_to_consume
261 )
262 }
263 });
264 let source_progress = (source_count > 0).then_some(format!(
265 "{} rows consumed",
266 self.source_backfill_consumed_rows
267 ));
268 match (mv_progress, source_progress) {
269 (Some(mv_progress), Some(source_progress)) => {
270 format!(
271 "MView Backfill: {}, Source Backfill: {}",
272 mv_progress, source_progress
273 )
274 }
275 (Some(mv_progress), None) => mv_progress,
276 (None, Some(source_progress)) => source_progress,
277 (None, None) => "Unknown".to_owned(),
278 }
279 }
280}
281
282pub struct TrackingJob {
289 job_id: JobId,
290 is_recovered: bool,
291 source_change: Option<SourceChange>,
292}
293
294impl std::fmt::Display for TrackingJob {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 write!(
297 f,
298 "{}{}",
299 self.job_id,
300 if self.is_recovered { "<recovered>" } else { "" }
301 )
302 }
303}
304
305impl TrackingJob {
306 pub(crate) fn new(job_id: JobId, source_change: Option<SourceChange>) -> Self {
308 Self {
309 job_id,
310 is_recovered: false,
311 source_change,
312 }
313 }
314
315 pub(crate) fn recovered(job_id: JobId, source_change: Option<SourceChange>) -> Self {
317 Self {
318 job_id,
319 is_recovered: true,
320 source_change,
321 }
322 }
323
324 pub(crate) async fn finish(
326 self,
327 metadata_manager: &MetadataManager,
328 source_manager: &SourceManagerRef,
329 ) -> MetaResult<()> {
330 metadata_manager
331 .catalog_controller
332 .finish_streaming_job(self.job_id)
333 .await?;
334 if let Some(source_change) = self.source_change {
335 source_manager.apply_source_change(source_change).await;
336 }
337 Ok(())
338 }
339}
340
341impl std::fmt::Debug for TrackingJob {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 if !self.is_recovered {
344 write!(f, "TrackingJob::New({})", self.job_id)
345 } else {
346 write!(f, "TrackingJob::Recovered({})", self.job_id)
347 }
348 }
349}
350
351#[derive(Debug, Default)]
353pub(super) struct StagingCommitInfo {
354 pub finished_jobs: Vec<TrackingJob>,
356 pub table_ids_to_truncate: Vec<TableId>,
358}
359
360pub(super) enum UpdateProgressResult {
361 None,
362 Finished(TrackingJob, PendingBackfillFragments),
364 BackfillNodeFinished(PendingBackfillFragments),
366}
367
368#[derive(Default, Debug)]
374pub(super) struct CreateMviewProgressTracker {
375 progress_map: HashMap<JobId, (Progress, TrackingJob)>,
377
378 actor_map: HashMap<ActorId, JobId>,
379
380 pending_backfill_nodes: Vec<FragmentId>,
382
383 staging_commit_info: StagingCommitInfo,
385}
386
387impl CreateMviewProgressTracker {
388 pub fn recover(
396 jobs: impl IntoIterator<
397 Item = (
398 JobId,
399 (String, &InflightStreamingJobInfo, BackfillOrderState),
400 ),
401 >,
402 version_stats: &HummockVersionStats,
403 ) -> Self {
404 let mut actor_map = HashMap::new();
405 let mut progress_map = HashMap::new();
406 let mut finished_jobs = vec![];
407 for (creating_job_id, (definition, job_info, backfill_order_state)) in jobs {
408 let source_backfill_fragments = StreamJobFragments::source_backfill_fragments_impl(
409 job_info
410 .fragment_infos
411 .iter()
412 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
413 );
414 let source_change = if source_backfill_fragments.is_empty() {
415 None
416 } else {
417 Some(SourceChange::CreateJobFinished {
418 finished_backfill_fragments: source_backfill_fragments,
419 })
420 };
421 let tracking_job = TrackingJob::recovered(creating_job_id, source_change);
422 let actors = job_info.tracking_progress_actor_ids();
423 if actors.is_empty() {
424 finished_jobs.push(tracking_job);
425 } else {
426 let mut states = HashMap::new();
427 let mut backfill_upstream_types = HashMap::new();
428
429 for (actor, backfill_upstream_type) in actors {
430 actor_map.insert(actor, creating_job_id);
431 states.insert(actor, BackfillState::ConsumingUpstream(Epoch(0), 0, 0));
432 backfill_upstream_types.insert(actor, backfill_upstream_type);
433 }
434
435 let progress = Self::recover_progress(
436 states,
437 backfill_upstream_types,
438 StreamJobFragments::upstream_table_counts_impl(
439 job_info
440 .fragment_infos
441 .values()
442 .map(|fragment| &fragment.nodes),
443 ),
444 definition,
445 version_stats,
446 backfill_order_state,
447 );
448 progress_map.insert(creating_job_id, (progress, tracking_job));
449 }
450 }
451 Self {
452 progress_map,
453 actor_map,
454 pending_backfill_nodes: Vec::new(),
455 staging_commit_info: StagingCommitInfo {
456 finished_jobs,
457 table_ids_to_truncate: vec![],
458 },
459 }
460 }
461
462 fn recover_progress(
468 states: HashMap<ActorId, BackfillState>,
469 backfill_upstream_types: HashMap<ActorId, BackfillUpstreamType>,
470 upstream_mv_count: HashMap<TableId, usize>,
471 definition: String,
472 version_stats: &HummockVersionStats,
473 backfill_order_state: BackfillOrderState,
474 ) -> Progress {
475 let upstream_mvs_total_key_count =
476 calculate_total_key_count(&upstream_mv_count, version_stats);
477 Progress {
478 states,
479 backfill_order_state,
480 backfill_upstream_types,
481 done_count: 0, upstream_mv_count,
483 upstream_mvs_total_key_count,
484 mv_backfill_consumed_rows: 0, source_backfill_consumed_rows: 0, mv_backfill_buffered_rows: 0, definition,
488 create_type: CreateType::Background,
489 }
490 }
491
492 pub fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
493 self.progress_map
494 .iter()
495 .map(|(job_id, (x, _))| {
496 let ddl_progress = DdlProgress {
497 id: job_id.as_raw_id() as u64,
498 statement: x.definition.clone(),
499 create_type: x.create_type.as_str().to_owned(),
500 progress: x.calculate_progress(),
501 };
502 (*job_id, ddl_progress)
503 })
504 .collect()
505 }
506
507 pub(super) fn update_tracking_jobs<'a>(
510 &mut self,
511 info: Option<&CreateStreamingJobCommandInfo>,
512 create_mview_progress: impl IntoIterator<Item = &'a CreateMviewProgress>,
513 version_stats: &HummockVersionStats,
514 ) {
515 let mut table_ids_to_truncate = vec![];
516 let finished_commands = {
518 let mut commands = vec![];
519 if let Some(create_job_info) = info
521 && let Some(command) = self.add(create_job_info, version_stats)
522 {
523 commands.push(command);
525 }
526 for progress in create_mview_progress {
528 match self.update(progress, version_stats) {
530 UpdateProgressResult::None => {
531 tracing::trace!(?progress, "update progress");
532 }
533 UpdateProgressResult::Finished(command, pending) => {
534 table_ids_to_truncate
535 .extend(pending.truncate_locality_provider_state_tables.clone());
536 self.queue_backfill(pending.next_backfill_nodes);
537 tracing::trace!(?progress, "finish progress");
538 commands.push(command);
539 }
540 UpdateProgressResult::BackfillNodeFinished(pending) => {
541 table_ids_to_truncate
542 .extend(pending.truncate_locality_provider_state_tables.clone());
543 tracing::trace!(
544 ?progress,
545 next_backfill_nodes = ?pending.next_backfill_nodes,
546 "start next backfill node"
547 );
548 self.queue_backfill(pending.next_backfill_nodes);
549 }
550 }
551 }
552 commands
553 };
554
555 for command in finished_commands {
556 self.staging_commit_info.finished_jobs.push(command);
557 }
558 self.staging_commit_info
559 .table_ids_to_truncate
560 .extend(table_ids_to_truncate);
561 }
562
563 pub(super) fn apply_collected_command(
566 &mut self,
567 command: Option<&Command>,
568 barrier_info: &BarrierInfo,
569 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
570 version_stats: &HummockVersionStats,
571 ) -> Option<StagingCommitInfo> {
572 let new_tracking_job_info =
573 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
574 match job_type {
575 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
576 Some(info)
577 }
578 CreateStreamingJobType::SnapshotBackfill(_) => {
579 None
581 }
582 }
583 } else {
584 None
585 };
586 self.update_tracking_jobs(
587 new_tracking_job_info,
588 resps
589 .into_iter()
590 .flat_map(|resp| resp.create_mview_progress.iter()),
591 version_stats,
592 );
593 for job_id in command.map(Command::jobs_to_drop).into_iter().flatten() {
594 self.cancel_command(job_id);
597 }
598 if barrier_info.kind.is_checkpoint() {
599 Some(take(&mut self.staging_commit_info))
600 } else {
601 None
602 }
603 }
604
605 fn queue_backfill(&mut self, backfill_nodes: impl IntoIterator<Item = FragmentId>) {
606 self.pending_backfill_nodes.extend(backfill_nodes);
607 }
608
609 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
610 take(&mut self.pending_backfill_nodes)
611 }
612
613 pub(super) fn has_pending_finished_jobs(&self) -> bool {
614 !self.staging_commit_info.finished_jobs.is_empty()
615 }
616
617 pub(super) fn cancel_command(&mut self, id: JobId) {
618 let _ = self.progress_map.remove(&id);
619 self.staging_commit_info
620 .finished_jobs
621 .retain(|x| x.job_id != id);
622 self.actor_map.retain(|_, table_id| *table_id != id);
623 }
624
625 pub fn abort_all(&mut self) {
627 self.actor_map.clear();
628 self.staging_commit_info = StagingCommitInfo::default();
629 self.progress_map.clear();
630 }
631
632 pub fn add(
636 &mut self,
637 info: &CreateStreamingJobCommandInfo,
638 version_stats: &HummockVersionStats,
639 ) -> Option<TrackingJob> {
640 tracing::trace!(?info, "add job to track");
641 let (info, actors) = {
642 let CreateStreamingJobCommandInfo {
643 stream_job_fragments,
644 ..
645 } = info;
646 let actors = stream_job_fragments.tracking_progress_actor_ids();
647 if actors.is_empty() {
648 return Some(TrackingJob::new(
650 info.stream_job_fragments.stream_job_id,
651 Some(SourceChange::CreateJobFinished {
652 finished_backfill_fragments: stream_job_fragments
653 .source_backfill_fragments(),
654 }),
655 ));
656 }
657 (info.clone(), actors)
658 };
659
660 let CreateStreamingJobCommandInfo {
661 stream_job_fragments: table_fragments,
662 definition,
663 create_type,
664 fragment_backfill_ordering,
665 locality_fragment_state_table_mapping,
666 ..
667 } = info;
668
669 let creating_job_id = table_fragments.stream_job_id();
670 let upstream_mv_count = table_fragments.upstream_table_counts();
671 let upstream_total_key_count: u64 =
672 calculate_total_key_count(&upstream_mv_count, version_stats);
673
674 for (actor, _backfill_upstream_type) in &actors {
675 self.actor_map.insert(*actor, creating_job_id);
676 }
677
678 let backfill_order_state = BackfillOrderState::new(
679 fragment_backfill_ordering,
680 &table_fragments,
681 locality_fragment_state_table_mapping,
682 );
683 let progress = Progress::new(
684 actors,
685 upstream_mv_count,
686 upstream_total_key_count,
687 definition,
688 create_type.into(),
689 backfill_order_state,
690 );
691 let old = self.progress_map.insert(
692 creating_job_id,
693 (
694 progress,
695 TrackingJob::new(
696 creating_job_id,
697 Some(SourceChange::CreateJobFinished {
698 finished_backfill_fragments: table_fragments.source_backfill_fragments(),
699 }),
700 ),
701 ),
702 );
703 assert!(old.is_none());
704 None
705 }
706
707 pub fn update(
711 &mut self,
712 progress: &CreateMviewProgress,
713 version_stats: &HummockVersionStats,
714 ) -> UpdateProgressResult {
715 tracing::trace!(?progress, "update progress");
716 let actor = progress.backfill_actor_id;
717 let Some(table_id) = self.actor_map.get(&actor).copied() else {
718 tracing::info!(
725 "no tracked progress for actor {}, the stream job could already be finished",
726 actor
727 );
728 return UpdateProgressResult::None;
729 };
730
731 let new_state = if progress.done {
732 BackfillState::Done(progress.consumed_rows, progress.buffered_rows)
733 } else {
734 BackfillState::ConsumingUpstream(
735 progress.consumed_epoch.into(),
736 progress.consumed_rows,
737 progress.buffered_rows,
738 )
739 };
740
741 match self.progress_map.entry(table_id) {
742 Entry::Occupied(mut o) => {
743 let progress_state = &mut o.get_mut().0;
744
745 let upstream_total_key_count: u64 =
746 calculate_total_key_count(&progress_state.upstream_mv_count, version_stats);
747
748 tracing::debug!(?table_id, "updating progress for table");
749 let pending = progress_state.update(actor, new_state, upstream_total_key_count);
750
751 if progress_state.is_done() {
752 tracing::debug!(
753 "all actors done for creating mview with table_id {}!",
754 table_id
755 );
756
757 for actor in o.get().0.actors() {
759 self.actor_map.remove(&actor);
760 }
761 assert!(pending.next_backfill_nodes.is_empty());
762 UpdateProgressResult::Finished(o.remove().1, pending)
763 } else if !pending.next_backfill_nodes.is_empty()
764 || !pending.truncate_locality_provider_state_tables.is_empty()
765 {
766 UpdateProgressResult::BackfillNodeFinished(pending)
767 } else {
768 UpdateProgressResult::None
769 }
770 }
771 Entry::Vacant(_) => {
772 tracing::warn!(
773 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
774 );
775 UpdateProgressResult::None
776 }
777 }
778 }
779}
780
781fn calculate_total_key_count(
782 table_count: &HashMap<TableId, usize>,
783 version_stats: &HummockVersionStats,
784) -> u64 {
785 table_count
786 .iter()
787 .map(|(table_id, count)| {
788 assert_ne!(*count, 0);
789 *count as u64
790 * version_stats
791 .table_stats
792 .get(&table_id.as_raw_id())
793 .map_or(0, |stat| stat.total_key_count as u64)
794 })
795 .sum()
796}