1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::manager::ActiveStreamingWorkerNodes;
38use crate::model::{ActorId, FragmentId, StreamActor};
39use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
40use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
41use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, StreamFragmentGraph};
42
43impl GlobalBarrierWorkerContextImpl {
44 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
46 self.metadata_manager
47 .catalog_controller
48 .clean_dirty_subscription(database_id)
49 .await?;
50 let dirty_associated_source_ids = self
51 .metadata_manager
52 .catalog_controller
53 .clean_dirty_creating_jobs(database_id)
54 .await?;
55 self.metadata_manager
56 .catalog_controller
57 .reset_refreshing_tables(database_id)
58 .await?;
59
60 self.source_manager
62 .apply_source_change(SourceChange::DropSource {
63 dropped_source_ids: dirty_associated_source_ids,
64 })
65 .await;
66
67 Ok(())
68 }
69
70 async fn purge_state_table_from_hummock(
71 &self,
72 all_state_table_ids: &HashSet<TableId>,
73 ) -> MetaResult<()> {
74 self.hummock_manager.purge(all_state_table_ids).await?;
75 Ok(())
76 }
77
78 async fn list_background_job_progress(
79 &self,
80 database_id: Option<DatabaseId>,
81 ) -> MetaResult<HashMap<JobId, String>> {
82 let mgr = &self.metadata_manager;
83 let job_info = mgr
84 .catalog_controller
85 .list_background_creating_jobs(false, database_id)
86 .await?;
87
88 Ok(job_info
89 .into_iter()
90 .map(|(job_id, definition, _init_at)| (job_id, definition))
91 .collect())
92 }
93
94 async fn resolve_graph_info(
98 &self,
99 database_id: Option<DatabaseId>,
100 worker_nodes: &ActiveStreamingWorkerNodes,
101 ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
102 {
103 let all_actor_infos = self
104 .metadata_manager
105 .catalog_controller
106 .load_all_actors_dynamic(database_id, worker_nodes)
107 .await?;
108
109 Ok(all_actor_infos
110 .into_iter()
111 .map(|(loaded_database_id, job_fragment_infos)| {
112 if let Some(database_id) = database_id {
113 assert_eq!(database_id, loaded_database_id);
114 }
115 (
116 loaded_database_id,
117 job_fragment_infos
118 .into_iter()
119 .map(|(job_id, fragment_infos)| {
120 (
121 job_id,
122 fragment_infos
123 .into_iter()
124 .map(|(fragment_id, info)| (fragment_id as _, info))
125 .collect(),
126 )
127 })
128 .collect(),
129 )
130 })
131 .collect())
132 }
133
134 #[expect(clippy::type_complexity)]
135 fn resolve_hummock_version_epochs(
136 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
137 version: &HummockVersion,
138 ) -> MetaResult<(
139 HashMap<TableId, u64>,
140 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
141 )> {
142 let table_committed_epoch: HashMap<_, _> = version
143 .state_table_info
144 .info()
145 .iter()
146 .map(|(table_id, info)| (*table_id, info.committed_epoch))
147 .collect();
148 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
149 Ok(*table_committed_epoch
150 .get(&table_id)
151 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
152 };
153 let mut min_downstream_committed_epochs = HashMap::new();
154 for (job_id, fragments) in background_jobs {
155 let job_committed_epoch = {
156 let mut table_id_iter =
157 InflightFragmentInfo::existing_table_ids(fragments.values());
158 let Some(first_table_id) = table_id_iter.next() else {
159 bail!("job {} has no state table", job_id);
160 };
161 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
162 for table_id in table_id_iter {
163 let table_committed_epoch = get_table_committed_epoch(table_id)?;
164 if job_committed_epoch != table_committed_epoch {
165 bail!(
166 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
167 first_table_id,
168 job_committed_epoch,
169 table_id,
170 table_committed_epoch,
171 job_id
172 );
173 }
174 }
175
176 job_committed_epoch
177 };
178 if let (Some(snapshot_backfill_info), _) =
179 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
180 fragments
181 .values()
182 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
183 )?
184 {
185 for (upstream_table, snapshot_epoch) in
186 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
187 {
188 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
189 anyhow!(
190 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
191 job_id, upstream_table
192 )
193 })?;
194 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
195 match min_downstream_committed_epochs.entry(upstream_table) {
196 Entry::Occupied(entry) => {
197 let prev_min_epoch = entry.into_mut();
198 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
199 }
200 Entry::Vacant(entry) => {
201 entry.insert(pinned_epoch);
202 }
203 }
204 }
205 }
206 }
207 let mut log_epochs = HashMap::new();
208 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
209 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
210 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
211 Ordering::Less => {
212 bail!(
213 "downstream epoch {} later than upstream epoch {} of table {}",
214 downstream_committed_epoch,
215 upstream_committed_epoch,
216 upstream_table_id
217 );
218 }
219 Ordering::Equal => {
220 continue;
221 }
222 Ordering::Greater => {
223 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
224 {
225 let epochs = table_change_log
226 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
227 .map(|epoch_log| {
228 (
229 epoch_log.non_checkpoint_epochs.clone(),
230 epoch_log.checkpoint_epoch,
231 )
232 })
233 .collect_vec();
234 let first_epochs = epochs.first();
235 if let Some((_, first_checkpoint_epoch)) = &first_epochs
236 && *first_checkpoint_epoch == downstream_committed_epoch
237 {
238 } else {
239 bail!(
240 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
241 epochs,
242 upstream_table_id,
243 downstream_committed_epoch
244 );
245 }
246 log_epochs
247 .try_insert(upstream_table_id, epochs)
248 .expect("non-duplicated");
249 } else {
250 bail!(
251 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
252 upstream_table_id,
253 upstream_committed_epoch,
254 downstream_committed_epoch
255 );
256 }
257 }
258 }
259 }
260 Ok((table_committed_epoch, log_epochs))
261 }
262
263 fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
264 where
265 I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
266 {
267 let mut cdc_table_backfill_actors = HashMap::new();
268
269 for (job_id, fragments) in jobs {
270 for fragment_info in fragments.values() {
271 if fragment_info
272 .fragment_type_mask
273 .contains(FragmentTypeFlag::StreamCdcScan)
274 {
275 cdc_table_backfill_actors
276 .entry(*job_id)
277 .or_insert_with(HashSet::new)
278 .extend(fragment_info.actors.keys().cloned());
279 }
280 }
281 }
282
283 cdc_table_backfill_actors
284 }
285
286 async fn recovery_table_with_upstream_sinks(
291 &self,
292 inflight_jobs: &mut HashMap<
293 DatabaseId,
294 HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
295 >,
296 ) -> MetaResult<()> {
297 let mut jobs = inflight_jobs.values_mut().try_fold(
298 HashMap::new(),
299 |mut acc, table_map| -> MetaResult<_> {
300 for (job_id, job) in table_map {
301 if acc.insert(*job_id, job).is_some() {
302 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
303 }
304 }
305 Ok(acc)
306 },
307 )?;
308 let tables = self
310 .metadata_manager
311 .catalog_controller
312 .get_user_created_table_by_ids(jobs.keys().copied())
313 .await?;
314 for table in tables {
315 assert_eq!(table.table_type(), PbTableType::Table);
316 let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
317 let mut target_fragment_id = None;
318 for fragment in fragment_infos.values() {
319 let mut is_target_fragment = false;
320 visit_stream_node_cont(&fragment.nodes, |node| {
321 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
322 is_target_fragment = true;
323 false
324 } else {
325 true
326 }
327 });
328 if is_target_fragment {
329 target_fragment_id = Some(fragment.fragment_id);
330 break;
331 }
332 }
333 let Some(target_fragment_id) = target_fragment_id else {
334 tracing::debug!(
335 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
336 table.id
337 );
338 continue;
339 };
340 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
341 let upstream_infos = self
342 .metadata_manager
343 .catalog_controller
344 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
345 .await?;
346 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
347 }
348
349 Ok(())
350 }
351
352 pub(super) async fn reload_runtime_info_impl(
353 &self,
354 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
355 {
356 {
357 {
358 self.clean_dirty_streaming_jobs(None)
359 .await
360 .context("clean dirty streaming jobs")?;
361
362 tracing::info!("recovering background job progress");
364 let background_jobs = self
365 .list_background_job_progress(None)
366 .await
367 .context("recover background job progress should not fail")?;
368
369 tracing::info!("recovered background job progress");
370
371 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
373 self.metadata_manager
374 .catalog_controller
375 .cleanup_dropped_tables()
376 .await;
377
378 let active_streaming_nodes =
379 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
380 .await?;
381
382 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
383
384 tracing::info!(
385 "background streaming jobs: {:?} total {}",
386 background_streaming_jobs,
387 background_streaming_jobs.len()
388 );
389
390 let unreschedulable_jobs = {
391 let mut unreschedulable_jobs = HashSet::new();
392
393 for job_id in background_streaming_jobs {
394 let scan_types = self
395 .metadata_manager
396 .get_job_backfill_scan_types(job_id)
397 .await?;
398
399 if scan_types
400 .values()
401 .any(|scan_type| !scan_type.is_reschedulable())
402 {
403 unreschedulable_jobs.insert(job_id);
404 }
405 }
406
407 unreschedulable_jobs
408 };
409
410 if !unreschedulable_jobs.is_empty() {
411 tracing::info!(
412 "unreschedulable background jobs: {:?}",
413 unreschedulable_jobs
414 );
415 }
416
417 let mut info = if unreschedulable_jobs.is_empty() {
421 info!("trigger offline scaling");
422 self.resolve_graph_info(None, &active_streaming_nodes)
423 .await
424 .inspect_err(|err| {
425 warn!(error = %err.as_report(), "resolve actor info failed");
426 })?
427 } else {
428 bail!(
429 "Recovery for unreschedulable background jobs is not yet implemented. \
430 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
431 unreschedulable_jobs
432 );
433 };
434
435 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
436 if !dropped_table_ids.is_empty() {
437 self.metadata_manager
438 .catalog_controller
439 .complete_dropped_tables(dropped_table_ids)
440 .await;
441 info = self
442 .resolve_graph_info(None, &active_streaming_nodes)
443 .await
444 .inspect_err(|err| {
445 warn!(error = %err.as_report(), "resolve actor info failed");
446 })?
447 }
448
449 self.recovery_table_with_upstream_sinks(&mut info).await?;
450
451 let info = info;
452
453 self.purge_state_table_from_hummock(
454 &info
455 .values()
456 .flatten()
457 .flat_map(|(_, fragments)| {
458 InflightFragmentInfo::existing_table_ids(fragments.values())
459 })
460 .collect(),
461 )
462 .await
463 .context("purge state table from hummock")?;
464
465 let (state_table_committed_epochs, state_table_log_epochs) = self
466 .hummock_manager
467 .on_current_version(|version| {
468 Self::resolve_hummock_version_epochs(
469 info.values().flat_map(|jobs| {
470 jobs.iter().filter_map(|(job_id, job)| {
471 background_jobs
472 .contains_key(job_id)
473 .then_some((*job_id, job))
474 })
475 }),
476 version,
477 )
478 })
479 .await?;
480
481 let mv_depended_subscriptions = self
482 .metadata_manager
483 .get_mv_depended_subscriptions(None)
484 .await?;
485
486 let stream_actors = self.load_stream_actors(&info).await?;
487
488 let fragment_relations = self
489 .metadata_manager
490 .catalog_controller
491 .get_fragment_downstream_relations(
492 info.values()
493 .flatten()
494 .flat_map(|(_, job)| job.keys())
495 .map(|fragment_id| *fragment_id as _)
496 .collect(),
497 )
498 .await?;
499
500 let background_jobs = {
501 let mut background_jobs = self
502 .list_background_job_progress(None)
503 .await
504 .context("recover background job progress should not fail")?;
505 info.values()
506 .flatten()
507 .filter_map(|(job_id, _)| {
508 background_jobs
509 .remove(job_id)
510 .map(|definition| (*job_id, definition))
511 })
512 .collect()
513 };
514
515 let database_infos = self
516 .metadata_manager
517 .catalog_controller
518 .list_databases()
519 .await?;
520
521 let mut source_splits = HashMap::new();
523 for (_, fragment_infos) in info.values().flatten() {
524 for fragment in fragment_infos.values() {
525 for (actor_id, info) in &fragment.actors {
526 source_splits.insert(*actor_id, info.splits.clone());
527 }
528 }
529 }
530
531 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
532 info.values().flat_map(|jobs| jobs.iter()),
533 );
534
535 let cdc_table_ids = cdc_table_backfill_actors
536 .keys()
537 .cloned()
538 .collect::<Vec<_>>();
539 let cdc_table_snapshot_split_assignment =
540 assign_cdc_table_snapshot_splits_pairs(
541 cdc_table_backfill_actors,
542 self.env.meta_store_ref(),
543 self.env.cdc_table_backfill_tracker.completed_job_ids(),
544 )
545 .await?;
546 let cdc_table_snapshot_split_assignment =
547 if cdc_table_snapshot_split_assignment.is_empty() {
548 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
549 } else {
550 let generation = self
551 .env
552 .cdc_table_backfill_tracker
553 .next_generation(cdc_table_ids.into_iter());
554 CdcTableSnapshotSplitAssignmentWithGeneration::new(
555 cdc_table_snapshot_split_assignment,
556 generation,
557 )
558 };
559 Ok(BarrierWorkerRuntimeInfoSnapshot {
560 active_streaming_nodes,
561 database_job_infos: info,
562 state_table_committed_epochs,
563 state_table_log_epochs,
564 mv_depended_subscriptions,
565 stream_actors,
566 fragment_relations,
567 source_splits,
568 background_jobs,
569 hummock_version_stats: self.hummock_manager.get_version_stats().await,
570 database_infos,
571 cdc_table_snapshot_split_assignment,
572 })
573 }
574 }
575 }
576 }
577
578 pub(super) async fn reload_database_runtime_info_impl(
579 &self,
580 database_id: DatabaseId,
581 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
582 self.clean_dirty_streaming_jobs(Some(database_id))
583 .await
584 .context("clean dirty streaming jobs")?;
585
586 tracing::info!(
588 ?database_id,
589 "recovering background job progress of database"
590 );
591
592 let background_jobs = self
593 .list_background_job_progress(Some(database_id))
594 .await
595 .context("recover background job progress of database should not fail")?;
596 tracing::info!(?database_id, "recovered background job progress");
597
598 let dropped_table_ids = self
600 .scheduled_barriers
601 .pre_apply_drop_cancel(Some(database_id));
602 self.metadata_manager
603 .catalog_controller
604 .complete_dropped_tables(dropped_table_ids)
605 .await;
606
607 let active_streaming_nodes =
608 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
609
610 let all_info = self
611 .resolve_graph_info(Some(database_id), &active_streaming_nodes)
612 .await
613 .inspect_err(|err| {
614 warn!(error = %err.as_report(), "resolve actor info failed");
615 })?;
616
617 let mut database_info = all_info
618 .get(&database_id)
619 .cloned()
620 .map_or_else(HashMap::new, |table_map| {
621 HashMap::from([(database_id, table_map)])
622 });
623
624 self.recovery_table_with_upstream_sinks(&mut database_info)
625 .await?;
626
627 assert!(database_info.len() <= 1);
628
629 let stream_actors = self.load_stream_actors(&database_info).await?;
630
631 let Some(info) = database_info
632 .into_iter()
633 .next()
634 .map(|(loaded_database_id, info)| {
635 assert_eq!(loaded_database_id, database_id);
636 info
637 })
638 else {
639 return Ok(None);
640 };
641
642 let (state_table_committed_epochs, state_table_log_epochs) = self
643 .hummock_manager
644 .on_current_version(|version| {
645 Self::resolve_hummock_version_epochs(
646 background_jobs
647 .keys()
648 .map(|job_id| (*job_id, &info[job_id])),
649 version,
650 )
651 })
652 .await?;
653
654 let mv_depended_subscriptions = self
655 .metadata_manager
656 .get_mv_depended_subscriptions(Some(database_id))
657 .await?;
658
659 let fragment_relations = self
660 .metadata_manager
661 .catalog_controller
662 .get_fragment_downstream_relations(
663 info.values()
664 .flatten()
665 .map(|(fragment_id, _)| *fragment_id as _)
666 .collect(),
667 )
668 .await?;
669
670 let mut source_splits = HashMap::new();
672 for (_, fragment) in info.values().flatten() {
673 for (actor_id, info) in &fragment.actors {
674 source_splits.insert(*actor_id, info.splits.clone());
675 }
676 }
677
678 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
679
680 let cdc_table_ids = cdc_table_backfill_actors
681 .keys()
682 .cloned()
683 .collect::<Vec<_>>();
684 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
685 cdc_table_backfill_actors,
686 self.env.meta_store_ref(),
687 self.env.cdc_table_backfill_tracker.completed_job_ids(),
688 )
689 .await?;
690 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
691 {
692 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
693 } else {
694 CdcTableSnapshotSplitAssignmentWithGeneration::new(
695 cdc_table_snapshot_split_assignment,
696 self.env
697 .cdc_table_backfill_tracker
698 .next_generation(cdc_table_ids.into_iter()),
699 )
700 };
701
702 REFRESH_TABLE_PROGRESS_TRACKER
703 .lock()
704 .remove_tracker_by_database_id(database_id);
705
706 Ok(Some(DatabaseRuntimeInfoSnapshot {
707 job_infos: info,
708 state_table_committed_epochs,
709 state_table_log_epochs,
710 mv_depended_subscriptions,
711 stream_actors,
712 fragment_relations,
713 source_splits,
714 background_jobs,
715 cdc_table_snapshot_split_assignment,
716 }))
717 }
718
719 async fn load_stream_actors(
720 &self,
721 all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
722 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
723 let job_ids = all_info
724 .values()
725 .flat_map(|jobs| jobs.keys().copied())
726 .collect_vec();
727
728 let job_extra_info = self
729 .metadata_manager
730 .catalog_controller
731 .get_streaming_job_extra_info(job_ids)
732 .await?;
733
734 let mut stream_actors = HashMap::new();
735
736 for (job_id, streaming_info) in all_info.values().flatten() {
737 let extra_info = job_extra_info
738 .get(job_id)
739 .cloned()
740 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
741 let expr_context = extra_info.stream_context().to_expr_context();
742 let job_definition = extra_info.job_definition;
743 let config_override = extra_info.config_override;
744
745 for (fragment_id, fragment_info) in streaming_info {
746 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_info.actors {
747 stream_actors.insert(
748 *actor_id,
749 StreamActor {
750 actor_id: *actor_id as _,
751 fragment_id: *fragment_id,
752 vnode_bitmap: vnode_bitmap.clone(),
753 mview_definition: job_definition.clone(),
754 expr_context: Some(expr_context.clone()),
755 config_override: config_override.clone(),
756 },
757 );
758 }
759 }
760 }
761 Ok(stream_actors)
762 }
763}