1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
24use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::controller::utils::StreamingJobExtraInfo;
38use crate::manager::ActiveStreamingWorkerNodes;
39use crate::model::{ActorId, FragmentId, StreamActor, StreamContext};
40use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
41use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
42use crate::stream::{SourceChange, StreamFragmentGraph};
43
44impl GlobalBarrierWorkerContextImpl {
45 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47 let database_id = database_id.map(|database_id| database_id.database_id as _);
48 self.metadata_manager
49 .catalog_controller
50 .clean_dirty_subscription(database_id)
51 .await?;
52 let dirty_associated_source_ids = self
53 .metadata_manager
54 .catalog_controller
55 .clean_dirty_creating_jobs(database_id)
56 .await?;
57 self.metadata_manager
58 .catalog_controller
59 .reset_refreshing_tables(database_id)
60 .await?;
61
62 self.source_manager
64 .apply_source_change(SourceChange::DropSource {
65 dropped_source_ids: dirty_associated_source_ids,
66 })
67 .await;
68
69 Ok(())
70 }
71
72 async fn purge_state_table_from_hummock(
73 &self,
74 all_state_table_ids: &HashSet<TableId>,
75 ) -> MetaResult<()> {
76 self.hummock_manager.purge(all_state_table_ids).await?;
77 Ok(())
78 }
79
80 async fn list_background_job_progress(
81 &self,
82 database_id: Option<DatabaseId>,
83 ) -> MetaResult<HashMap<TableId, String>> {
84 let mgr = &self.metadata_manager;
85 let job_info = mgr
86 .catalog_controller
87 .list_background_creating_jobs(
88 false,
89 database_id.map(|database_id| database_id.database_id as _),
90 )
91 .await?;
92
93 Ok(job_info
94 .into_iter()
95 .map(|(id, definition, _init_at)| {
96 let table_id = TableId::new(id as _);
97 (table_id, definition)
98 })
99 .collect())
100 }
101
102 async fn resolve_graph_info(
106 &self,
107 database_id: Option<DatabaseId>,
108 worker_nodes: &ActiveStreamingWorkerNodes,
109 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
110 {
111 let database_id = database_id.map(|database_id| database_id.database_id as _);
112
113 let all_actor_infos = self
114 .metadata_manager
115 .catalog_controller
116 .load_all_actors_dynamic(database_id, worker_nodes)
117 .await?;
118
119 Ok(all_actor_infos
120 .into_iter()
121 .map(|(loaded_database_id, job_fragment_infos)| {
122 if let Some(database_id) = database_id {
123 assert_eq!(database_id, loaded_database_id);
124 }
125 (
126 DatabaseId::new(loaded_database_id as _),
127 job_fragment_infos
128 .into_iter()
129 .map(|(job_id, fragment_infos)| {
130 let job_id = TableId::new(job_id as _);
131 (
132 job_id,
133 fragment_infos
134 .into_iter()
135 .map(|(fragment_id, info)| (fragment_id as _, info))
136 .collect(),
137 )
138 })
139 .collect(),
140 )
141 })
142 .collect())
143 }
144
145 #[expect(clippy::type_complexity)]
146 fn resolve_hummock_version_epochs(
147 background_jobs: impl Iterator<Item = (TableId, &HashMap<FragmentId, InflightFragmentInfo>)>,
148 version: &HummockVersion,
149 ) -> MetaResult<(
150 HashMap<TableId, u64>,
151 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
152 )> {
153 let table_committed_epoch: HashMap<_, _> = version
154 .state_table_info
155 .info()
156 .iter()
157 .map(|(table_id, info)| (*table_id, info.committed_epoch))
158 .collect();
159 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
160 Ok(*table_committed_epoch
161 .get(&table_id)
162 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
163 };
164 let mut min_downstream_committed_epochs = HashMap::new();
165 for (job_id, fragments) in background_jobs {
166 let job_committed_epoch = {
167 let mut table_id_iter =
168 InflightFragmentInfo::existing_table_ids(fragments.values());
169 let Some(first_table_id) = table_id_iter.next() else {
170 bail!("job {} has no state table", job_id);
171 };
172 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
173 for table_id in table_id_iter {
174 let table_committed_epoch = get_table_committed_epoch(table_id)?;
175 if job_committed_epoch != table_committed_epoch {
176 bail!(
177 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
178 first_table_id,
179 job_committed_epoch,
180 table_id,
181 table_committed_epoch,
182 job_id
183 );
184 }
185 }
186
187 job_committed_epoch
188 };
189 if let (Some(snapshot_backfill_info), _) =
190 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
191 fragments
192 .values()
193 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
194 )?
195 {
196 for (upstream_table, snapshot_epoch) in
197 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
198 {
199 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
200 anyhow!(
201 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
202 job_id, upstream_table
203 )
204 })?;
205 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
206 match min_downstream_committed_epochs.entry(upstream_table) {
207 Entry::Occupied(entry) => {
208 let prev_min_epoch = entry.into_mut();
209 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
210 }
211 Entry::Vacant(entry) => {
212 entry.insert(pinned_epoch);
213 }
214 }
215 }
216 }
217 }
218 let mut log_epochs = HashMap::new();
219 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
220 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
221 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
222 Ordering::Less => {
223 bail!(
224 "downstream epoch {} later than upstream epoch {} of table {}",
225 downstream_committed_epoch,
226 upstream_committed_epoch,
227 upstream_table_id
228 );
229 }
230 Ordering::Equal => {
231 continue;
232 }
233 Ordering::Greater => {
234 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
235 {
236 let epochs = table_change_log
237 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
238 .map(|epoch_log| {
239 (
240 epoch_log.non_checkpoint_epochs.clone(),
241 epoch_log.checkpoint_epoch,
242 )
243 })
244 .collect_vec();
245 let first_epochs = epochs.first();
246 if let Some((_, first_checkpoint_epoch)) = &first_epochs
247 && *first_checkpoint_epoch == downstream_committed_epoch
248 {
249 } else {
250 bail!(
251 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
252 epochs,
253 upstream_table_id,
254 downstream_committed_epoch
255 );
256 }
257 log_epochs
258 .try_insert(upstream_table_id, epochs)
259 .expect("non-duplicated");
260 } else {
261 bail!(
262 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
263 upstream_table_id,
264 upstream_committed_epoch,
265 downstream_committed_epoch
266 );
267 }
268 }
269 }
270 }
271 Ok((table_committed_epoch, log_epochs))
272 }
273
274 fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<u32, HashSet<ActorId>>
275 where
276 I: Iterator<Item = (&'a TableId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
277 {
278 let mut cdc_table_backfill_actors = HashMap::new();
279
280 for (job_id, fragments) in jobs {
281 for fragment_info in fragments.values() {
282 if fragment_info
283 .fragment_type_mask
284 .contains(FragmentTypeFlag::StreamCdcScan)
285 {
286 cdc_table_backfill_actors
287 .entry(job_id.table_id())
288 .or_insert_with(HashSet::new)
289 .extend(fragment_info.actors.keys().cloned());
290 }
291 }
292 }
293
294 cdc_table_backfill_actors
295 }
296
297 async fn recovery_table_with_upstream_sinks(
302 &self,
303 inflight_jobs: &mut HashMap<
304 DatabaseId,
305 HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>,
306 >,
307 ) -> MetaResult<()> {
308 let mut jobs = inflight_jobs.values_mut().try_fold(
309 HashMap::new(),
310 |mut acc, table_map| -> MetaResult<_> {
311 for (tid, job) in table_map {
312 if acc.insert(tid.table_id, job).is_some() {
313 return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
314 }
315 }
316 Ok(acc)
317 },
318 )?;
319 let job_ids = jobs.keys().cloned().collect_vec();
320 let tables = self
322 .metadata_manager
323 .catalog_controller
324 .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
325 .await?;
326 for table in tables {
327 assert_eq!(table.table_type(), PbTableType::Table);
328 let fragment_infos = jobs.get_mut(&table.id).unwrap();
329 let mut target_fragment_id = None;
330 for fragment in fragment_infos.values() {
331 let mut is_target_fragment = false;
332 visit_stream_node_cont(&fragment.nodes, |node| {
333 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
334 is_target_fragment = true;
335 false
336 } else {
337 true
338 }
339 });
340 if is_target_fragment {
341 target_fragment_id = Some(fragment.fragment_id);
342 break;
343 }
344 }
345 let Some(target_fragment_id) = target_fragment_id else {
346 tracing::debug!(
347 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
348 table.id
349 );
350 continue;
351 };
352 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
353 let upstream_infos = self
354 .metadata_manager
355 .catalog_controller
356 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
357 .await?;
358 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
359 }
360
361 Ok(())
362 }
363
364 pub(super) async fn reload_runtime_info_impl(
365 &self,
366 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
367 {
368 {
369 {
370 self.clean_dirty_streaming_jobs(None)
371 .await
372 .context("clean dirty streaming jobs")?;
373
374 tracing::info!("recovering background job progress");
376 let background_jobs = self
377 .list_background_job_progress(None)
378 .await
379 .context("recover background job progress should not fail")?;
380
381 tracing::info!("recovered background job progress");
382
383 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
385 self.metadata_manager
386 .catalog_controller
387 .cleanup_dropped_tables()
388 .await;
389
390 let active_streaming_nodes =
391 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
392 .await?;
393
394 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
395
396 tracing::info!(
397 "background streaming jobs: {:?} total {}",
398 background_streaming_jobs,
399 background_streaming_jobs.len()
400 );
401
402 let unreschedulable_jobs = {
403 let mut unreschedulable_jobs = HashSet::new();
404
405 for job_id in background_streaming_jobs {
406 let scan_types = self
407 .metadata_manager
408 .get_job_backfill_scan_types(job_id.table_id as ObjectId)
409 .await?;
410
411 if scan_types
412 .values()
413 .any(|scan_type| !scan_type.is_reschedulable())
414 {
415 unreschedulable_jobs.insert(job_id);
416 }
417 }
418
419 unreschedulable_jobs
420 };
421
422 if !unreschedulable_jobs.is_empty() {
423 tracing::info!(
424 "unreschedulable background jobs: {:?}",
425 unreschedulable_jobs
426 );
427 }
428
429 let mut info = if unreschedulable_jobs.is_empty() {
433 info!("trigger offline scaling");
434 self.resolve_graph_info(None, &active_streaming_nodes)
435 .await
436 .inspect_err(|err| {
437 warn!(error = %err.as_report(), "resolve actor info failed");
438 })?
439 } else {
440 bail!(
441 "Recovery for unreschedulable background jobs is not yet implemented. \
442 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
443 unreschedulable_jobs
444 );
445 };
446
447 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
448 if !dropped_table_ids.is_empty() {
449 self.metadata_manager
450 .catalog_controller
451 .complete_dropped_tables(
452 dropped_table_ids.into_iter().map(|id| id.table_id as _),
453 )
454 .await;
455 info = self
456 .resolve_graph_info(None, &active_streaming_nodes)
457 .await
458 .inspect_err(|err| {
459 warn!(error = %err.as_report(), "resolve actor info failed");
460 })?
461 }
462
463 self.recovery_table_with_upstream_sinks(&mut info).await?;
464
465 let info = info;
466
467 self.purge_state_table_from_hummock(
468 &info
469 .values()
470 .flatten()
471 .flat_map(|(_, fragments)| {
472 InflightFragmentInfo::existing_table_ids(fragments.values())
473 })
474 .collect(),
475 )
476 .await
477 .context("purge state table from hummock")?;
478
479 let (state_table_committed_epochs, state_table_log_epochs) = self
480 .hummock_manager
481 .on_current_version(|version| {
482 Self::resolve_hummock_version_epochs(
483 info.values().flat_map(|jobs| {
484 jobs.iter().filter_map(|(job_id, job)| {
485 background_jobs
486 .contains_key(job_id)
487 .then_some((*job_id, job))
488 })
489 }),
490 version,
491 )
492 })
493 .await?;
494
495 let mv_depended_subscriptions = self
496 .metadata_manager
497 .get_mv_depended_subscriptions(None)
498 .await?;
499
500 let stream_actors = self.load_stream_actors(&info).await?;
501
502 let fragment_relations = self
503 .metadata_manager
504 .catalog_controller
505 .get_fragment_downstream_relations(
506 info.values()
507 .flatten()
508 .flat_map(|(_, job)| job.keys())
509 .map(|fragment_id| *fragment_id as _)
510 .collect(),
511 )
512 .await?;
513
514 let background_jobs = {
515 let mut background_jobs = self
516 .list_background_job_progress(None)
517 .await
518 .context("recover background job progress should not fail")?;
519 info.values()
520 .flatten()
521 .filter_map(|(job_id, _)| {
522 background_jobs
523 .remove(job_id)
524 .map(|definition| (*job_id, definition))
525 })
526 .collect()
527 };
528
529 let database_infos = self
530 .metadata_manager
531 .catalog_controller
532 .list_databases()
533 .await?;
534
535 let mut source_splits = HashMap::new();
537 for (_, fragment_infos) in info.values().flatten() {
538 for fragment in fragment_infos.values() {
539 for (actor_id, info) in &fragment.actors {
540 source_splits.insert(*actor_id, info.splits.clone());
541 }
542 }
543 }
544
545 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
546 info.values().flat_map(|jobs| jobs.iter()),
547 );
548
549 let cdc_table_ids = cdc_table_backfill_actors
550 .keys()
551 .cloned()
552 .collect::<Vec<_>>();
553 let cdc_table_snapshot_split_assignment =
554 assign_cdc_table_snapshot_splits_pairs(
555 cdc_table_backfill_actors,
556 self.env.meta_store_ref(),
557 self.env.cdc_table_backfill_tracker.completed_job_ids(),
558 )
559 .await?;
560 let cdc_table_snapshot_split_assignment =
561 if cdc_table_snapshot_split_assignment.is_empty() {
562 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
563 } else {
564 let generation = self
565 .env
566 .cdc_table_backfill_tracker
567 .next_generation(cdc_table_ids.into_iter());
568 CdcTableSnapshotSplitAssignmentWithGeneration::new(
569 cdc_table_snapshot_split_assignment,
570 generation,
571 )
572 };
573 Ok(BarrierWorkerRuntimeInfoSnapshot {
574 active_streaming_nodes,
575 database_job_infos: info,
576 state_table_committed_epochs,
577 state_table_log_epochs,
578 mv_depended_subscriptions,
579 stream_actors,
580 fragment_relations,
581 source_splits,
582 background_jobs,
583 hummock_version_stats: self.hummock_manager.get_version_stats().await,
584 database_infos,
585 cdc_table_snapshot_split_assignment,
586 })
587 }
588 }
589 }
590 }
591
592 pub(super) async fn reload_database_runtime_info_impl(
593 &self,
594 database_id: DatabaseId,
595 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
596 self.clean_dirty_streaming_jobs(Some(database_id))
597 .await
598 .context("clean dirty streaming jobs")?;
599
600 tracing::info!(
602 ?database_id,
603 "recovering background job progress of database"
604 );
605
606 let background_jobs = self
607 .list_background_job_progress(Some(database_id))
608 .await
609 .context("recover background job progress of database should not fail")?;
610 tracing::info!(?database_id, "recovered background job progress");
611
612 let dropped_table_ids = self
614 .scheduled_barriers
615 .pre_apply_drop_cancel(Some(database_id));
616 self.metadata_manager
617 .catalog_controller
618 .complete_dropped_tables(dropped_table_ids.into_iter().map(|id| id.table_id as _))
619 .await;
620
621 let active_streaming_nodes =
622 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
623
624 let all_info = self
625 .resolve_graph_info(Some(database_id), &active_streaming_nodes)
626 .await
627 .inspect_err(|err| {
628 warn!(error = %err.as_report(), "resolve actor info failed");
629 })?;
630
631 let mut database_info = all_info
632 .get(&database_id)
633 .cloned()
634 .map_or_else(HashMap::new, |table_map| {
635 HashMap::from([(database_id, table_map)])
636 });
637
638 self.recovery_table_with_upstream_sinks(&mut database_info)
639 .await?;
640
641 assert!(database_info.len() <= 1);
642
643 let stream_actors = self.load_stream_actors(&database_info).await?;
644
645 let Some(info) = database_info
646 .into_iter()
647 .next()
648 .map(|(loaded_database_id, info)| {
649 assert_eq!(loaded_database_id, database_id);
650 info
651 })
652 else {
653 return Ok(None);
654 };
655
656 let (state_table_committed_epochs, state_table_log_epochs) = self
657 .hummock_manager
658 .on_current_version(|version| {
659 Self::resolve_hummock_version_epochs(
660 background_jobs
661 .keys()
662 .map(|job_id| (*job_id, &info[job_id])),
663 version,
664 )
665 })
666 .await?;
667
668 let mv_depended_subscriptions = self
669 .metadata_manager
670 .get_mv_depended_subscriptions(Some(database_id))
671 .await?;
672
673 let fragment_relations = self
674 .metadata_manager
675 .catalog_controller
676 .get_fragment_downstream_relations(
677 info.values()
678 .flatten()
679 .map(|(fragment_id, _)| *fragment_id as _)
680 .collect(),
681 )
682 .await?;
683
684 let mut source_splits = HashMap::new();
686 for (_, fragment) in info.values().flatten() {
687 for (actor_id, info) in &fragment.actors {
688 source_splits.insert(*actor_id, info.splits.clone());
689 }
690 }
691
692 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
693
694 let cdc_table_ids = cdc_table_backfill_actors
695 .keys()
696 .cloned()
697 .collect::<Vec<_>>();
698 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
699 cdc_table_backfill_actors,
700 self.env.meta_store_ref(),
701 self.env.cdc_table_backfill_tracker.completed_job_ids(),
702 )
703 .await?;
704 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
705 {
706 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
707 } else {
708 CdcTableSnapshotSplitAssignmentWithGeneration::new(
709 cdc_table_snapshot_split_assignment,
710 self.env
711 .cdc_table_backfill_tracker
712 .next_generation(cdc_table_ids.into_iter()),
713 )
714 };
715 Ok(Some(DatabaseRuntimeInfoSnapshot {
716 job_infos: info,
717 state_table_committed_epochs,
718 state_table_log_epochs,
719 mv_depended_subscriptions,
720 stream_actors,
721 fragment_relations,
722 source_splits,
723 background_jobs,
724 cdc_table_snapshot_split_assignment,
725 }))
726 }
727
728 async fn load_stream_actors(
729 &self,
730 all_info: &HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>,
731 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
732 let job_ids = all_info
733 .values()
734 .flat_map(|jobs| jobs.keys().map(|job_id| job_id.table_id as i32))
735 .collect_vec();
736
737 let job_extra_info = self
738 .metadata_manager
739 .catalog_controller
740 .get_streaming_job_extra_info(job_ids)
741 .await?;
742
743 let mut stream_actors = HashMap::new();
744
745 for (job_id, streaming_info) in all_info.values().flatten() {
746 let StreamingJobExtraInfo {
747 timezone,
748 job_definition,
749 } = job_extra_info
750 .get(&(job_id.table_id as i32))
751 .cloned()
752 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id.table_id))?;
753
754 let expr_context = Some(StreamContext { timezone }.to_expr_context());
755
756 for (fragment_id, fragment_info) in streaming_info {
757 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_info.actors {
758 stream_actors.insert(
759 *actor_id,
760 StreamActor {
761 actor_id: *actor_id as _,
762 fragment_id: *fragment_id as _,
763 vnode_bitmap: vnode_bitmap.clone(),
764 mview_definition: job_definition.clone(),
765 expr_context: expr_context.clone(),
766 },
767 );
768 }
769 }
770 }
771 Ok(stream_actors)
772 }
773}