1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::controller::utils::StreamingJobExtraInfo;
38use crate::manager::ActiveStreamingWorkerNodes;
39use crate::model::{ActorId, FragmentId, StreamActor, StreamContext};
40use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
41use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
42use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, StreamFragmentGraph};
43
44impl GlobalBarrierWorkerContextImpl {
45 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47 self.metadata_manager
48 .catalog_controller
49 .clean_dirty_subscription(database_id)
50 .await?;
51 let dirty_associated_source_ids = self
52 .metadata_manager
53 .catalog_controller
54 .clean_dirty_creating_jobs(database_id)
55 .await?;
56 self.metadata_manager
57 .catalog_controller
58 .reset_refreshing_tables(database_id)
59 .await?;
60
61 self.source_manager
63 .apply_source_change(SourceChange::DropSource {
64 dropped_source_ids: dirty_associated_source_ids,
65 })
66 .await;
67
68 Ok(())
69 }
70
71 async fn purge_state_table_from_hummock(
72 &self,
73 all_state_table_ids: &HashSet<TableId>,
74 ) -> MetaResult<()> {
75 self.hummock_manager.purge(all_state_table_ids).await?;
76 Ok(())
77 }
78
79 async fn list_background_job_progress(
80 &self,
81 database_id: Option<DatabaseId>,
82 ) -> MetaResult<HashMap<JobId, String>> {
83 let mgr = &self.metadata_manager;
84 let job_info = mgr
85 .catalog_controller
86 .list_background_creating_jobs(false, database_id)
87 .await?;
88
89 Ok(job_info
90 .into_iter()
91 .map(|(job_id, definition, _init_at)| (job_id, definition))
92 .collect())
93 }
94
95 async fn resolve_graph_info(
99 &self,
100 database_id: Option<DatabaseId>,
101 worker_nodes: &ActiveStreamingWorkerNodes,
102 ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
103 {
104 let all_actor_infos = self
105 .metadata_manager
106 .catalog_controller
107 .load_all_actors_dynamic(database_id, worker_nodes)
108 .await?;
109
110 Ok(all_actor_infos
111 .into_iter()
112 .map(|(loaded_database_id, job_fragment_infos)| {
113 if let Some(database_id) = database_id {
114 assert_eq!(database_id, loaded_database_id);
115 }
116 (
117 loaded_database_id,
118 job_fragment_infos
119 .into_iter()
120 .map(|(job_id, fragment_infos)| {
121 (
122 job_id,
123 fragment_infos
124 .into_iter()
125 .map(|(fragment_id, info)| (fragment_id as _, info))
126 .collect(),
127 )
128 })
129 .collect(),
130 )
131 })
132 .collect())
133 }
134
135 #[expect(clippy::type_complexity)]
136 fn resolve_hummock_version_epochs(
137 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
138 version: &HummockVersion,
139 ) -> MetaResult<(
140 HashMap<TableId, u64>,
141 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
142 )> {
143 let table_committed_epoch: HashMap<_, _> = version
144 .state_table_info
145 .info()
146 .iter()
147 .map(|(table_id, info)| (*table_id, info.committed_epoch))
148 .collect();
149 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
150 Ok(*table_committed_epoch
151 .get(&table_id)
152 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
153 };
154 let mut min_downstream_committed_epochs = HashMap::new();
155 for (job_id, fragments) in background_jobs {
156 let job_committed_epoch = {
157 let mut table_id_iter =
158 InflightFragmentInfo::existing_table_ids(fragments.values());
159 let Some(first_table_id) = table_id_iter.next() else {
160 bail!("job {} has no state table", job_id);
161 };
162 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
163 for table_id in table_id_iter {
164 let table_committed_epoch = get_table_committed_epoch(table_id)?;
165 if job_committed_epoch != table_committed_epoch {
166 bail!(
167 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
168 first_table_id,
169 job_committed_epoch,
170 table_id,
171 table_committed_epoch,
172 job_id
173 );
174 }
175 }
176
177 job_committed_epoch
178 };
179 if let (Some(snapshot_backfill_info), _) =
180 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
181 fragments
182 .values()
183 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
184 )?
185 {
186 for (upstream_table, snapshot_epoch) in
187 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188 {
189 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
190 anyhow!(
191 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
192 job_id, upstream_table
193 )
194 })?;
195 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
196 match min_downstream_committed_epochs.entry(upstream_table) {
197 Entry::Occupied(entry) => {
198 let prev_min_epoch = entry.into_mut();
199 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
200 }
201 Entry::Vacant(entry) => {
202 entry.insert(pinned_epoch);
203 }
204 }
205 }
206 }
207 }
208 let mut log_epochs = HashMap::new();
209 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
210 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
211 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
212 Ordering::Less => {
213 bail!(
214 "downstream epoch {} later than upstream epoch {} of table {}",
215 downstream_committed_epoch,
216 upstream_committed_epoch,
217 upstream_table_id
218 );
219 }
220 Ordering::Equal => {
221 continue;
222 }
223 Ordering::Greater => {
224 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
225 {
226 let epochs = table_change_log
227 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
228 .map(|epoch_log| {
229 (
230 epoch_log.non_checkpoint_epochs.clone(),
231 epoch_log.checkpoint_epoch,
232 )
233 })
234 .collect_vec();
235 let first_epochs = epochs.first();
236 if let Some((_, first_checkpoint_epoch)) = &first_epochs
237 && *first_checkpoint_epoch == downstream_committed_epoch
238 {
239 } else {
240 bail!(
241 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
242 epochs,
243 upstream_table_id,
244 downstream_committed_epoch
245 );
246 }
247 log_epochs
248 .try_insert(upstream_table_id, epochs)
249 .expect("non-duplicated");
250 } else {
251 bail!(
252 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
253 upstream_table_id,
254 upstream_committed_epoch,
255 downstream_committed_epoch
256 );
257 }
258 }
259 }
260 }
261 Ok((table_committed_epoch, log_epochs))
262 }
263
264 fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
265 where
266 I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
267 {
268 let mut cdc_table_backfill_actors = HashMap::new();
269
270 for (job_id, fragments) in jobs {
271 for fragment_info in fragments.values() {
272 if fragment_info
273 .fragment_type_mask
274 .contains(FragmentTypeFlag::StreamCdcScan)
275 {
276 cdc_table_backfill_actors
277 .entry(*job_id)
278 .or_insert_with(HashSet::new)
279 .extend(fragment_info.actors.keys().cloned());
280 }
281 }
282 }
283
284 cdc_table_backfill_actors
285 }
286
287 async fn recovery_table_with_upstream_sinks(
292 &self,
293 inflight_jobs: &mut HashMap<
294 DatabaseId,
295 HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
296 >,
297 ) -> MetaResult<()> {
298 let mut jobs = inflight_jobs.values_mut().try_fold(
299 HashMap::new(),
300 |mut acc, table_map| -> MetaResult<_> {
301 for (job_id, job) in table_map {
302 if acc.insert(*job_id, job).is_some() {
303 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
304 }
305 }
306 Ok(acc)
307 },
308 )?;
309 let tables = self
311 .metadata_manager
312 .catalog_controller
313 .get_user_created_table_by_ids(jobs.keys().copied())
314 .await?;
315 for table in tables {
316 assert_eq!(table.table_type(), PbTableType::Table);
317 let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
318 let mut target_fragment_id = None;
319 for fragment in fragment_infos.values() {
320 let mut is_target_fragment = false;
321 visit_stream_node_cont(&fragment.nodes, |node| {
322 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
323 is_target_fragment = true;
324 false
325 } else {
326 true
327 }
328 });
329 if is_target_fragment {
330 target_fragment_id = Some(fragment.fragment_id);
331 break;
332 }
333 }
334 let Some(target_fragment_id) = target_fragment_id else {
335 tracing::debug!(
336 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
337 table.id
338 );
339 continue;
340 };
341 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
342 let upstream_infos = self
343 .metadata_manager
344 .catalog_controller
345 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
346 .await?;
347 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
348 }
349
350 Ok(())
351 }
352
353 pub(super) async fn reload_runtime_info_impl(
354 &self,
355 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
356 {
357 {
358 {
359 self.clean_dirty_streaming_jobs(None)
360 .await
361 .context("clean dirty streaming jobs")?;
362
363 tracing::info!("recovering background job progress");
365 let background_jobs = self
366 .list_background_job_progress(None)
367 .await
368 .context("recover background job progress should not fail")?;
369
370 tracing::info!("recovered background job progress");
371
372 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
374 self.metadata_manager
375 .catalog_controller
376 .cleanup_dropped_tables()
377 .await;
378
379 let active_streaming_nodes =
380 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
381 .await?;
382
383 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
384
385 tracing::info!(
386 "background streaming jobs: {:?} total {}",
387 background_streaming_jobs,
388 background_streaming_jobs.len()
389 );
390
391 let unreschedulable_jobs = {
392 let mut unreschedulable_jobs = HashSet::new();
393
394 for job_id in background_streaming_jobs {
395 let scan_types = self
396 .metadata_manager
397 .get_job_backfill_scan_types(job_id)
398 .await?;
399
400 if scan_types
401 .values()
402 .any(|scan_type| !scan_type.is_reschedulable())
403 {
404 unreschedulable_jobs.insert(job_id);
405 }
406 }
407
408 unreschedulable_jobs
409 };
410
411 if !unreschedulable_jobs.is_empty() {
412 tracing::info!(
413 "unreschedulable background jobs: {:?}",
414 unreschedulable_jobs
415 );
416 }
417
418 let mut info = if unreschedulable_jobs.is_empty() {
422 info!("trigger offline scaling");
423 self.resolve_graph_info(None, &active_streaming_nodes)
424 .await
425 .inspect_err(|err| {
426 warn!(error = %err.as_report(), "resolve actor info failed");
427 })?
428 } else {
429 bail!(
430 "Recovery for unreschedulable background jobs is not yet implemented. \
431 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
432 unreschedulable_jobs
433 );
434 };
435
436 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
437 if !dropped_table_ids.is_empty() {
438 self.metadata_manager
439 .catalog_controller
440 .complete_dropped_tables(dropped_table_ids)
441 .await;
442 info = self
443 .resolve_graph_info(None, &active_streaming_nodes)
444 .await
445 .inspect_err(|err| {
446 warn!(error = %err.as_report(), "resolve actor info failed");
447 })?
448 }
449
450 self.recovery_table_with_upstream_sinks(&mut info).await?;
451
452 let info = info;
453
454 self.purge_state_table_from_hummock(
455 &info
456 .values()
457 .flatten()
458 .flat_map(|(_, fragments)| {
459 InflightFragmentInfo::existing_table_ids(fragments.values())
460 })
461 .collect(),
462 )
463 .await
464 .context("purge state table from hummock")?;
465
466 let (state_table_committed_epochs, state_table_log_epochs) = self
467 .hummock_manager
468 .on_current_version(|version| {
469 Self::resolve_hummock_version_epochs(
470 info.values().flat_map(|jobs| {
471 jobs.iter().filter_map(|(job_id, job)| {
472 background_jobs
473 .contains_key(job_id)
474 .then_some((*job_id, job))
475 })
476 }),
477 version,
478 )
479 })
480 .await?;
481
482 let mv_depended_subscriptions = self
483 .metadata_manager
484 .get_mv_depended_subscriptions(None)
485 .await?;
486
487 let stream_actors = self.load_stream_actors(&info).await?;
488
489 let fragment_relations = self
490 .metadata_manager
491 .catalog_controller
492 .get_fragment_downstream_relations(
493 info.values()
494 .flatten()
495 .flat_map(|(_, job)| job.keys())
496 .map(|fragment_id| *fragment_id as _)
497 .collect(),
498 )
499 .await?;
500
501 let background_jobs = {
502 let mut background_jobs = self
503 .list_background_job_progress(None)
504 .await
505 .context("recover background job progress should not fail")?;
506 info.values()
507 .flatten()
508 .filter_map(|(job_id, _)| {
509 background_jobs
510 .remove(job_id)
511 .map(|definition| (*job_id, definition))
512 })
513 .collect()
514 };
515
516 let database_infos = self
517 .metadata_manager
518 .catalog_controller
519 .list_databases()
520 .await?;
521
522 let mut source_splits = HashMap::new();
524 for (_, fragment_infos) in info.values().flatten() {
525 for fragment in fragment_infos.values() {
526 for (actor_id, info) in &fragment.actors {
527 source_splits.insert(*actor_id, info.splits.clone());
528 }
529 }
530 }
531
532 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
533 info.values().flat_map(|jobs| jobs.iter()),
534 );
535
536 let cdc_table_ids = cdc_table_backfill_actors
537 .keys()
538 .cloned()
539 .collect::<Vec<_>>();
540 let cdc_table_snapshot_split_assignment =
541 assign_cdc_table_snapshot_splits_pairs(
542 cdc_table_backfill_actors,
543 self.env.meta_store_ref(),
544 self.env.cdc_table_backfill_tracker.completed_job_ids(),
545 )
546 .await?;
547 let cdc_table_snapshot_split_assignment =
548 if cdc_table_snapshot_split_assignment.is_empty() {
549 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
550 } else {
551 let generation = self
552 .env
553 .cdc_table_backfill_tracker
554 .next_generation(cdc_table_ids.into_iter());
555 CdcTableSnapshotSplitAssignmentWithGeneration::new(
556 cdc_table_snapshot_split_assignment,
557 generation,
558 )
559 };
560 Ok(BarrierWorkerRuntimeInfoSnapshot {
561 active_streaming_nodes,
562 database_job_infos: info,
563 state_table_committed_epochs,
564 state_table_log_epochs,
565 mv_depended_subscriptions,
566 stream_actors,
567 fragment_relations,
568 source_splits,
569 background_jobs,
570 hummock_version_stats: self.hummock_manager.get_version_stats().await,
571 database_infos,
572 cdc_table_snapshot_split_assignment,
573 })
574 }
575 }
576 }
577 }
578
579 pub(super) async fn reload_database_runtime_info_impl(
580 &self,
581 database_id: DatabaseId,
582 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
583 self.clean_dirty_streaming_jobs(Some(database_id))
584 .await
585 .context("clean dirty streaming jobs")?;
586
587 tracing::info!(
589 ?database_id,
590 "recovering background job progress of database"
591 );
592
593 let background_jobs = self
594 .list_background_job_progress(Some(database_id))
595 .await
596 .context("recover background job progress of database should not fail")?;
597 tracing::info!(?database_id, "recovered background job progress");
598
599 let dropped_table_ids = self
601 .scheduled_barriers
602 .pre_apply_drop_cancel(Some(database_id));
603 self.metadata_manager
604 .catalog_controller
605 .complete_dropped_tables(dropped_table_ids)
606 .await;
607
608 let active_streaming_nodes =
609 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
610
611 let all_info = self
612 .resolve_graph_info(Some(database_id), &active_streaming_nodes)
613 .await
614 .inspect_err(|err| {
615 warn!(error = %err.as_report(), "resolve actor info failed");
616 })?;
617
618 let mut database_info = all_info
619 .get(&database_id)
620 .cloned()
621 .map_or_else(HashMap::new, |table_map| {
622 HashMap::from([(database_id, table_map)])
623 });
624
625 self.recovery_table_with_upstream_sinks(&mut database_info)
626 .await?;
627
628 assert!(database_info.len() <= 1);
629
630 let stream_actors = self.load_stream_actors(&database_info).await?;
631
632 let Some(info) = database_info
633 .into_iter()
634 .next()
635 .map(|(loaded_database_id, info)| {
636 assert_eq!(loaded_database_id, database_id);
637 info
638 })
639 else {
640 return Ok(None);
641 };
642
643 let (state_table_committed_epochs, state_table_log_epochs) = self
644 .hummock_manager
645 .on_current_version(|version| {
646 Self::resolve_hummock_version_epochs(
647 background_jobs
648 .keys()
649 .map(|job_id| (*job_id, &info[job_id])),
650 version,
651 )
652 })
653 .await?;
654
655 let mv_depended_subscriptions = self
656 .metadata_manager
657 .get_mv_depended_subscriptions(Some(database_id))
658 .await?;
659
660 let fragment_relations = self
661 .metadata_manager
662 .catalog_controller
663 .get_fragment_downstream_relations(
664 info.values()
665 .flatten()
666 .map(|(fragment_id, _)| *fragment_id as _)
667 .collect(),
668 )
669 .await?;
670
671 let mut source_splits = HashMap::new();
673 for (_, fragment) in info.values().flatten() {
674 for (actor_id, info) in &fragment.actors {
675 source_splits.insert(*actor_id, info.splits.clone());
676 }
677 }
678
679 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
680
681 let cdc_table_ids = cdc_table_backfill_actors
682 .keys()
683 .cloned()
684 .collect::<Vec<_>>();
685 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
686 cdc_table_backfill_actors,
687 self.env.meta_store_ref(),
688 self.env.cdc_table_backfill_tracker.completed_job_ids(),
689 )
690 .await?;
691 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
692 {
693 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
694 } else {
695 CdcTableSnapshotSplitAssignmentWithGeneration::new(
696 cdc_table_snapshot_split_assignment,
697 self.env
698 .cdc_table_backfill_tracker
699 .next_generation(cdc_table_ids.into_iter()),
700 )
701 };
702
703 REFRESH_TABLE_PROGRESS_TRACKER
704 .lock()
705 .remove_tracker_by_database_id(database_id);
706
707 Ok(Some(DatabaseRuntimeInfoSnapshot {
708 job_infos: info,
709 state_table_committed_epochs,
710 state_table_log_epochs,
711 mv_depended_subscriptions,
712 stream_actors,
713 fragment_relations,
714 source_splits,
715 background_jobs,
716 cdc_table_snapshot_split_assignment,
717 }))
718 }
719
720 async fn load_stream_actors(
721 &self,
722 all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
723 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
724 let job_ids = all_info
725 .values()
726 .flat_map(|jobs| jobs.keys().copied())
727 .collect_vec();
728
729 let job_extra_info = self
730 .metadata_manager
731 .catalog_controller
732 .get_streaming_job_extra_info(job_ids)
733 .await?;
734
735 let mut stream_actors = HashMap::new();
736
737 for (job_id, streaming_info) in all_info.values().flatten() {
738 let StreamingJobExtraInfo {
739 timezone,
740 job_definition,
741 } = job_extra_info
742 .get(job_id)
743 .cloned()
744 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
745
746 let expr_context = Some(StreamContext { timezone }.to_expr_context());
747
748 for (fragment_id, fragment_info) in streaming_info {
749 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_info.actors {
750 stream_actors.insert(
751 *actor_id,
752 StreamActor {
753 actor_id: *actor_id as _,
754 fragment_id: *fragment_id,
755 vnode_bitmap: vnode_bitmap.clone(),
756 mview_definition: job_definition.clone(),
757 expr_context: expr_context.clone(),
758 },
759 );
760 }
761 }
762 }
763 Ok(stream_actors)
764 }
765}