1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::manager::ActiveStreamingWorkerNodes;
38use crate::model::{ActorId, FragmentId, StreamActor};
39use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
40use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
41use crate::stream::{SourceChange, StreamFragmentGraph};
42
43impl GlobalBarrierWorkerContextImpl {
44 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
46 self.metadata_manager
47 .catalog_controller
48 .clean_dirty_subscription(database_id)
49 .await?;
50 let dirty_associated_source_ids = self
51 .metadata_manager
52 .catalog_controller
53 .clean_dirty_creating_jobs(database_id)
54 .await?;
55 self.metadata_manager
56 .reset_all_refresh_jobs_to_idle()
57 .await?;
58
59 self.source_manager
61 .apply_source_change(SourceChange::DropSource {
62 dropped_source_ids: dirty_associated_source_ids,
63 })
64 .await;
65
66 Ok(())
67 }
68
69 async fn purge_state_table_from_hummock(
70 &self,
71 all_state_table_ids: &HashSet<TableId>,
72 ) -> MetaResult<()> {
73 self.hummock_manager.purge(all_state_table_ids).await?;
74 Ok(())
75 }
76
77 async fn list_background_job_progress(
78 &self,
79 database_id: Option<DatabaseId>,
80 ) -> MetaResult<HashMap<JobId, String>> {
81 let mgr = &self.metadata_manager;
82 let job_info = mgr
83 .catalog_controller
84 .list_background_creating_jobs(false, database_id)
85 .await?;
86
87 Ok(job_info
88 .into_iter()
89 .map(|(job_id, definition, _init_at)| (job_id, definition))
90 .collect())
91 }
92
93 async fn resolve_database_info(
97 &self,
98 database_id: Option<DatabaseId>,
99 worker_nodes: &ActiveStreamingWorkerNodes,
100 ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
101 {
102 let all_actor_infos = self
103 .metadata_manager
104 .catalog_controller
105 .load_all_actors_dynamic(database_id, worker_nodes)
106 .await?;
107
108 Ok(all_actor_infos
109 .into_iter()
110 .map(|(loaded_database_id, job_fragment_infos)| {
111 if let Some(database_id) = database_id {
112 assert_eq!(database_id, loaded_database_id);
113 }
114 (
115 loaded_database_id,
116 job_fragment_infos
117 .into_iter()
118 .map(|(job_id, fragment_infos)| {
119 (
120 job_id,
121 fragment_infos
122 .into_iter()
123 .map(|(fragment_id, info)| (fragment_id as _, info))
124 .collect(),
125 )
126 })
127 .collect(),
128 )
129 })
130 .collect())
131 }
132
133 #[expect(clippy::type_complexity)]
134 fn resolve_hummock_version_epochs(
135 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
136 version: &HummockVersion,
137 ) -> MetaResult<(
138 HashMap<TableId, u64>,
139 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
140 )> {
141 let table_committed_epoch: HashMap<_, _> = version
142 .state_table_info
143 .info()
144 .iter()
145 .map(|(table_id, info)| (*table_id, info.committed_epoch))
146 .collect();
147 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
148 Ok(*table_committed_epoch
149 .get(&table_id)
150 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
151 };
152 let mut min_downstream_committed_epochs = HashMap::new();
153 for (job_id, fragments) in background_jobs {
154 let job_committed_epoch = {
155 let mut table_id_iter =
156 InflightFragmentInfo::existing_table_ids(fragments.values());
157 let Some(first_table_id) = table_id_iter.next() else {
158 bail!("job {} has no state table", job_id);
159 };
160 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
161 for table_id in table_id_iter {
162 let table_committed_epoch = get_table_committed_epoch(table_id)?;
163 if job_committed_epoch != table_committed_epoch {
164 bail!(
165 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
166 first_table_id,
167 job_committed_epoch,
168 table_id,
169 table_committed_epoch,
170 job_id
171 );
172 }
173 }
174
175 job_committed_epoch
176 };
177 if let (Some(snapshot_backfill_info), _) =
178 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
179 fragments
180 .values()
181 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
182 )?
183 {
184 for (upstream_table, snapshot_epoch) in
185 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
186 {
187 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
188 anyhow!(
189 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
190 job_id, upstream_table
191 )
192 })?;
193 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
194 match min_downstream_committed_epochs.entry(upstream_table) {
195 Entry::Occupied(entry) => {
196 let prev_min_epoch = entry.into_mut();
197 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
198 }
199 Entry::Vacant(entry) => {
200 entry.insert(pinned_epoch);
201 }
202 }
203 }
204 }
205 }
206 let mut log_epochs = HashMap::new();
207 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
208 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
209 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
210 Ordering::Less => {
211 bail!(
212 "downstream epoch {} later than upstream epoch {} of table {}",
213 downstream_committed_epoch,
214 upstream_committed_epoch,
215 upstream_table_id
216 );
217 }
218 Ordering::Equal => {
219 continue;
220 }
221 Ordering::Greater => {
222 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
223 {
224 let epochs = table_change_log
225 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
226 .map(|epoch_log| {
227 (
228 epoch_log.non_checkpoint_epochs.clone(),
229 epoch_log.checkpoint_epoch,
230 )
231 })
232 .collect_vec();
233 let first_epochs = epochs.first();
234 if let Some((_, first_checkpoint_epoch)) = &first_epochs
235 && *first_checkpoint_epoch == downstream_committed_epoch
236 {
237 } else {
238 bail!(
239 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
240 epochs,
241 upstream_table_id,
242 downstream_committed_epoch
243 );
244 }
245 log_epochs
246 .try_insert(upstream_table_id, epochs)
247 .expect("non-duplicated");
248 } else {
249 bail!(
250 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
251 upstream_table_id,
252 upstream_committed_epoch,
253 downstream_committed_epoch
254 );
255 }
256 }
257 }
258 }
259 Ok((table_committed_epoch, log_epochs))
260 }
261
262 fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
263 where
264 I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
265 {
266 let mut cdc_table_backfill_actors = HashMap::new();
267
268 for (job_id, fragments) in jobs {
269 for fragment_infos in fragments.values() {
270 if fragment_infos
271 .fragment_type_mask
272 .contains(FragmentTypeFlag::StreamCdcScan)
273 {
274 cdc_table_backfill_actors
275 .entry(*job_id)
276 .or_insert_with(HashSet::new)
277 .extend(fragment_infos.actors.keys().cloned());
278 }
279 }
280 }
281
282 cdc_table_backfill_actors
283 }
284
285 async fn recovery_table_with_upstream_sinks(
290 &self,
291 inflight_jobs: &mut HashMap<
292 DatabaseId,
293 HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
294 >,
295 ) -> MetaResult<()> {
296 let mut jobs = inflight_jobs.values_mut().try_fold(
297 HashMap::new(),
298 |mut acc, table_map| -> MetaResult<_> {
299 for (job_id, job) in table_map {
300 if acc.insert(*job_id, job).is_some() {
301 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
302 }
303 }
304 Ok(acc)
305 },
306 )?;
307 let tables = self
309 .metadata_manager
310 .catalog_controller
311 .get_user_created_table_by_ids(jobs.keys().copied())
312 .await?;
313 for table in tables {
314 assert_eq!(table.table_type(), PbTableType::Table);
315 let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
316 let mut target_fragment_id = None;
317 for fragment in fragment_infos.values() {
318 let mut is_target_fragment = false;
319 visit_stream_node_cont(&fragment.nodes, |node| {
320 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
321 is_target_fragment = true;
322 false
323 } else {
324 true
325 }
326 });
327 if is_target_fragment {
328 target_fragment_id = Some(fragment.fragment_id);
329 break;
330 }
331 }
332 let Some(target_fragment_id) = target_fragment_id else {
333 tracing::debug!(
334 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
335 table.id
336 );
337 continue;
338 };
339 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
340 let upstream_infos = self
341 .metadata_manager
342 .catalog_controller
343 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
344 .await?;
345 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
346 }
347
348 Ok(())
349 }
350
351 pub(super) async fn reload_runtime_info_impl(
352 &self,
353 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
354 {
355 {
356 {
357 self.clean_dirty_streaming_jobs(None)
358 .await
359 .context("clean dirty streaming jobs")?;
360
361 tracing::info!("recovering background job progress");
363 let background_jobs = self
364 .list_background_job_progress(None)
365 .await
366 .context("recover background job progress should not fail")?;
367
368 tracing::info!("recovered background job progress");
369
370 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
372 self.metadata_manager
373 .catalog_controller
374 .cleanup_dropped_tables()
375 .await;
376
377 let active_streaming_nodes =
378 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
379 .await?;
380
381 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
382
383 tracing::info!(
384 "background streaming jobs: {:?} total {}",
385 background_streaming_jobs,
386 background_streaming_jobs.len()
387 );
388
389 let unreschedulable_jobs = {
390 let mut unreschedulable_jobs = HashSet::new();
391
392 for job_id in background_streaming_jobs {
393 let scan_types = self
394 .metadata_manager
395 .get_job_backfill_scan_types(job_id)
396 .await?;
397
398 if scan_types
399 .values()
400 .any(|scan_type| !scan_type.is_reschedulable())
401 {
402 unreschedulable_jobs.insert(job_id);
403 }
404 }
405
406 unreschedulable_jobs
407 };
408
409 if !unreschedulable_jobs.is_empty() {
410 tracing::info!(
411 "unreschedulable background jobs: {:?}",
412 unreschedulable_jobs
413 );
414 }
415
416 let mut info = if unreschedulable_jobs.is_empty() {
420 info!("trigger offline scaling");
421 self.resolve_database_info(None, &active_streaming_nodes)
422 .await
423 .inspect_err(|err| {
424 warn!(error = %err.as_report(), "resolve actor info failed");
425 })?
426 } else {
427 bail!(
428 "Recovery for unreschedulable background jobs is not yet implemented. \
429 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
430 unreschedulable_jobs
431 );
432 };
433
434 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
435 if !dropped_table_ids.is_empty() {
436 self.metadata_manager
437 .catalog_controller
438 .complete_dropped_tables(dropped_table_ids)
439 .await;
440 info = self
441 .resolve_database_info(None, &active_streaming_nodes)
442 .await
443 .inspect_err(|err| {
444 warn!(error = %err.as_report(), "resolve actor info failed");
445 })?
446 }
447
448 self.recovery_table_with_upstream_sinks(&mut info).await?;
449
450 let info = info;
451
452 self.purge_state_table_from_hummock(
453 &info
454 .values()
455 .flatten()
456 .flat_map(|(_, fragments)| {
457 InflightFragmentInfo::existing_table_ids(fragments.values())
458 })
459 .collect(),
460 )
461 .await
462 .context("purge state table from hummock")?;
463
464 let (state_table_committed_epochs, state_table_log_epochs) = self
465 .hummock_manager
466 .on_current_version(|version| {
467 Self::resolve_hummock_version_epochs(
468 info.values().flat_map(|jobs| {
469 jobs.iter().filter_map(|(job_id, job)| {
470 background_jobs
471 .contains_key(job_id)
472 .then_some((*job_id, job))
473 })
474 }),
475 version,
476 )
477 })
478 .await?;
479
480 let mv_depended_subscriptions = self
481 .metadata_manager
482 .get_mv_depended_subscriptions(None)
483 .await?;
484
485 let stream_actors = self.load_stream_actors(&info).await?;
486
487 let fragment_relations = self
488 .metadata_manager
489 .catalog_controller
490 .get_fragment_downstream_relations(
491 info.values()
492 .flatten()
493 .flat_map(|(_, job)| job.keys())
494 .map(|fragment_id| *fragment_id as _)
495 .collect(),
496 )
497 .await?;
498
499 let background_jobs = {
500 let mut background_jobs = self
501 .list_background_job_progress(None)
502 .await
503 .context("recover background job progress should not fail")?;
504 info.values()
505 .flatten()
506 .filter_map(|(job_id, _)| {
507 background_jobs
508 .remove(job_id)
509 .map(|definition| (*job_id, definition))
510 })
511 .collect()
512 };
513
514 let database_infos = self
515 .metadata_manager
516 .catalog_controller
517 .list_databases()
518 .await?;
519
520 let mut source_splits = HashMap::new();
522 for (_, fragment_infos) in info.values().flatten() {
523 for fragment in fragment_infos.values() {
524 for (actor_id, info) in &fragment.actors {
525 source_splits.insert(*actor_id, info.splits.clone());
526 }
527 }
528 }
529
530 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
531 info.values().flat_map(|jobs| jobs.iter()),
532 );
533
534 let cdc_table_ids = cdc_table_backfill_actors
535 .keys()
536 .cloned()
537 .collect::<Vec<_>>();
538 let cdc_table_snapshot_split_assignment =
539 assign_cdc_table_snapshot_splits_pairs(
540 cdc_table_backfill_actors,
541 self.env.meta_store_ref(),
542 self.env.cdc_table_backfill_tracker.completed_job_ids(),
543 )
544 .await?;
545 let cdc_table_snapshot_split_assignment =
546 if cdc_table_snapshot_split_assignment.is_empty() {
547 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
548 } else {
549 let generation = self
550 .env
551 .cdc_table_backfill_tracker
552 .next_generation(cdc_table_ids.into_iter());
553 CdcTableSnapshotSplitAssignmentWithGeneration::new(
554 cdc_table_snapshot_split_assignment,
555 generation,
556 )
557 };
558 Ok(BarrierWorkerRuntimeInfoSnapshot {
559 active_streaming_nodes,
560 database_job_infos: info,
561 state_table_committed_epochs,
562 state_table_log_epochs,
563 mv_depended_subscriptions,
564 stream_actors,
565 fragment_relations,
566 source_splits,
567 background_jobs,
568 hummock_version_stats: self.hummock_manager.get_version_stats().await,
569 database_infos,
570 cdc_table_snapshot_split_assignment,
571 })
572 }
573 }
574 }
575 }
576
577 pub(super) async fn reload_database_runtime_info_impl(
578 &self,
579 database_id: DatabaseId,
580 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
581 self.clean_dirty_streaming_jobs(Some(database_id))
582 .await
583 .context("clean dirty streaming jobs")?;
584
585 tracing::info!(
587 ?database_id,
588 "recovering background job progress of database"
589 );
590
591 let background_jobs = self
592 .list_background_job_progress(Some(database_id))
593 .await
594 .context("recover background job progress of database should not fail")?;
595 tracing::info!(?database_id, "recovered background job progress");
596
597 let dropped_table_ids = self
599 .scheduled_barriers
600 .pre_apply_drop_cancel(Some(database_id));
601 self.metadata_manager
602 .catalog_controller
603 .complete_dropped_tables(dropped_table_ids)
604 .await;
605
606 let active_streaming_nodes =
607 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
608
609 let mut all_info = self
610 .resolve_database_info(Some(database_id), &active_streaming_nodes)
611 .await
612 .inspect_err(|err| {
613 warn!(error = %err.as_report(), "resolve actor info failed");
614 })?;
615
616 let mut database_info = all_info
617 .remove(&database_id)
618 .map_or_else(HashMap::new, |table_map| {
619 HashMap::from([(database_id, table_map)])
620 });
621
622 self.recovery_table_with_upstream_sinks(&mut database_info)
623 .await?;
624
625 assert!(database_info.len() <= 1);
626
627 let stream_actors = self.load_stream_actors(&database_info).await?;
628
629 let Some(info) = database_info
630 .into_iter()
631 .next()
632 .map(|(loaded_database_id, info)| {
633 assert_eq!(loaded_database_id, database_id);
634 info
635 })
636 else {
637 return Ok(None);
638 };
639
640 let (state_table_committed_epochs, state_table_log_epochs) = self
641 .hummock_manager
642 .on_current_version(|version| {
643 Self::resolve_hummock_version_epochs(
644 background_jobs
645 .keys()
646 .map(|job_id| (*job_id, &info[job_id])),
647 version,
648 )
649 })
650 .await?;
651
652 let mv_depended_subscriptions = self
653 .metadata_manager
654 .get_mv_depended_subscriptions(Some(database_id))
655 .await?;
656
657 let fragment_relations = self
658 .metadata_manager
659 .catalog_controller
660 .get_fragment_downstream_relations(
661 info.values()
662 .flatten()
663 .map(|(fragment_id, _)| *fragment_id as _)
664 .collect(),
665 )
666 .await?;
667
668 let mut source_splits = HashMap::new();
670 for (_, fragment) in info.values().flatten() {
671 for (actor_id, info) in &fragment.actors {
672 source_splits.insert(*actor_id, info.splits.clone());
673 }
674 }
675
676 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
677
678 let cdc_table_ids = cdc_table_backfill_actors
679 .keys()
680 .cloned()
681 .collect::<Vec<_>>();
682 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
683 cdc_table_backfill_actors,
684 self.env.meta_store_ref(),
685 self.env.cdc_table_backfill_tracker.completed_job_ids(),
686 )
687 .await?;
688 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
689 {
690 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
691 } else {
692 CdcTableSnapshotSplitAssignmentWithGeneration::new(
693 cdc_table_snapshot_split_assignment,
694 self.env
695 .cdc_table_backfill_tracker
696 .next_generation(cdc_table_ids.into_iter()),
697 )
698 };
699
700 self.refresh_manager
701 .remove_trackers_by_database(database_id);
702
703 Ok(Some(DatabaseRuntimeInfoSnapshot {
704 job_infos: info,
705 state_table_committed_epochs,
706 state_table_log_epochs,
707 mv_depended_subscriptions,
708 stream_actors,
709 fragment_relations,
710 source_splits,
711 background_jobs,
712 cdc_table_snapshot_split_assignment,
713 }))
714 }
715
716 async fn load_stream_actors(
717 &self,
718 all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
719 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
720 let job_ids = all_info
721 .values()
722 .flat_map(|jobs| jobs.keys().copied())
723 .collect_vec();
724
725 let job_extra_info = self
726 .metadata_manager
727 .catalog_controller
728 .get_streaming_job_extra_info(job_ids)
729 .await?;
730
731 let mut stream_actors = HashMap::new();
732
733 for (job_id, streaming_info) in all_info.values().flatten() {
734 let extra_info = job_extra_info
735 .get(job_id)
736 .cloned()
737 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
738 let expr_context = extra_info.stream_context().to_expr_context();
739 let job_definition = extra_info.job_definition;
740 let config_override = extra_info.config_override;
741
742 for (fragment_id, fragment_infos) in streaming_info {
743 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
744 stream_actors.insert(
745 *actor_id,
746 StreamActor {
747 actor_id: *actor_id as _,
748 fragment_id: *fragment_id,
749 vnode_bitmap: vnode_bitmap.clone(),
750 mview_definition: job_definition.clone(),
751 expr_context: Some(expr_context.clone()),
752 config_override: config_override.clone(),
753 },
754 );
755 }
756 }
757 }
758 Ok(stream_actors)
759 }
760}