1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::controller::utils::StreamingJobExtraInfo;
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
46use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
47use crate::stream::cdc::reload_cdc_table_snapshot_splits;
48use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
49
50struct UpstreamSinkRecoveryInfo {
51 target_fragment_id: FragmentId,
52 upstream_infos: Vec<UpstreamSinkInfo>,
53}
54
55struct LoadedRecoveryContext {
56 fragment_context: LoadedFragmentContext,
57 job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
58 upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
59 fragment_relations: FragmentDownstreamRelation,
60}
61
62impl LoadedRecoveryContext {
63 fn empty(fragment_context: LoadedFragmentContext) -> Self {
64 Self {
65 fragment_context,
66 job_extra_info: HashMap::new(),
67 upstream_sink_recovery: HashMap::new(),
68 fragment_relations: FragmentDownstreamRelation::default(),
69 }
70 }
71}
72
73fn recovery_table_with_upstream_sinks(
78 inflight_jobs: &mut FragmentRenderMap,
79 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
80) -> MetaResult<()> {
81 if upstream_sink_recovery.is_empty() {
82 return Ok(());
83 }
84
85 let mut seen_jobs = HashSet::new();
86
87 for jobs in inflight_jobs.values_mut() {
88 for (job_id, fragments) in jobs {
89 if !seen_jobs.insert(*job_id) {
90 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
91 }
92
93 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
94 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
95 refill_upstream_sink_union_in_table(
96 &mut target_fragment.nodes,
97 &recovery.upstream_infos,
98 );
99 } else {
100 return Err(anyhow::anyhow!(
101 "target fragment {} not found for upstream sink recovery of job {}",
102 recovery.target_fragment_id,
103 job_id
104 )
105 .into());
106 }
107 }
108 }
109 }
110
111 Ok(())
112}
113
114fn build_stream_actors(
120 all_info: &FragmentRenderMap,
121 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
122) -> MetaResult<HashMap<ActorId, StreamActor>> {
123 let mut stream_actors = HashMap::new();
124
125 for (job_id, streaming_info) in all_info.values().flatten() {
126 let extra_info = job_extra_info
127 .get(job_id)
128 .cloned()
129 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
130 let expr_context = extra_info.stream_context().to_expr_context();
131 let job_definition = extra_info.job_definition;
132 let config_override = extra_info.config_override;
133
134 for (fragment_id, fragment_infos) in streaming_info {
135 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
136 stream_actors.insert(
137 *actor_id,
138 StreamActor {
139 actor_id: *actor_id,
140 fragment_id: *fragment_id,
141 vnode_bitmap: vnode_bitmap.clone(),
142 mview_definition: job_definition.clone(),
143 expr_context: Some(expr_context.clone()),
144 config_override: config_override.clone(),
145 },
146 );
147 }
148 }
149 }
150 Ok(stream_actors)
151}
152
153impl GlobalBarrierWorkerContextImpl {
154 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
156 self.metadata_manager
157 .catalog_controller
158 .clean_dirty_subscription(database_id)
159 .await?;
160 let dirty_associated_source_ids = self
161 .metadata_manager
162 .catalog_controller
163 .clean_dirty_creating_jobs(database_id)
164 .await?;
165 self.metadata_manager
166 .reset_all_refresh_jobs_to_idle()
167 .await?;
168
169 self.source_manager
171 .apply_source_change(SourceChange::DropSource {
172 dropped_source_ids: dirty_associated_source_ids,
173 })
174 .await;
175
176 Ok(())
177 }
178
179 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
180 if let Some(database_id) = database_id {
181 let sink_ids = self
182 .metadata_manager
183 .catalog_controller
184 .list_sink_ids(Some(database_id))
185 .await?;
186 self.sink_manager.stop_sink_coordinator(sink_ids).await;
187 } else {
188 self.sink_manager.reset().await;
189 }
190 Ok(())
191 }
192
193 async fn abort_dirty_pending_sink_state(
194 &self,
195 database_id: Option<DatabaseId>,
196 ) -> MetaResult<()> {
197 let pending_sinks: HashSet<SinkId> = self
198 .metadata_manager
199 .catalog_controller
200 .list_all_pending_sinks(database_id)
201 .await?;
202
203 if pending_sinks.is_empty() {
204 return Ok(());
205 }
206
207 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
208 .metadata_manager
209 .catalog_controller
210 .fetch_sink_with_state_table_ids(pending_sinks)
211 .await?;
212
213 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
214
215 for (sink_id, table_ids) in sink_with_state_tables {
216 let Some(table_id) = table_ids.first() else {
217 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
218 };
219
220 self.hummock_manager
221 .on_current_version(|version| -> MetaResult<()> {
222 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
223 assert!(
224 sink_committed_epoch
225 .insert(sink_id, committed_epoch)
226 .is_none()
227 );
228 Ok(())
229 } else {
230 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
231 }
232 })
233 .await?;
234 }
235
236 self.metadata_manager
237 .catalog_controller
238 .abort_pending_sink_epochs(sink_committed_epoch)
239 .await?;
240
241 Ok(())
242 }
243
244 async fn purge_state_table_from_hummock(
245 &self,
246 all_state_table_ids: &HashSet<TableId>,
247 ) -> MetaResult<()> {
248 self.hummock_manager.purge(all_state_table_ids).await?;
249 Ok(())
250 }
251
252 async fn list_background_job_progress(
253 &self,
254 database_id: Option<DatabaseId>,
255 ) -> MetaResult<HashSet<JobId>> {
256 let mgr = &self.metadata_manager;
257 let job_info = mgr
258 .catalog_controller
259 .list_background_creating_jobs(false, database_id)
260 .await?;
261
262 Ok(job_info
263 .into_iter()
264 .map(|(job_id, _definition, _init_at)| job_id)
265 .collect())
266 }
267
268 fn render_actor_assignments(
270 &self,
271 database_id: Option<DatabaseId>,
272 loaded: &LoadedFragmentContext,
273 worker_nodes: &ActiveStreamingWorkerNodes,
274 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
275 ) -> MetaResult<FragmentRenderMap> {
276 if loaded.is_empty() {
277 return Ok(HashMap::new());
278 }
279
280 let available_workers: BTreeMap<_, _> = worker_nodes
281 .current()
282 .values()
283 .filter(|worker| worker.is_streaming_schedulable())
284 .map(|worker| {
285 (
286 worker.id,
287 WorkerInfo {
288 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
289 resource_group: worker.resource_group(),
290 },
291 )
292 })
293 .collect();
294
295 let RenderedGraph { fragments, .. } = render_actor_assignments(
296 self.metadata_manager
297 .catalog_controller
298 .env
299 .actor_id_generator(),
300 &available_workers,
301 adaptive_parallelism_strategy,
302 loaded,
303 )?;
304
305 if let Some(database_id) = database_id {
306 for loaded_database_id in fragments.keys() {
307 assert_eq!(*loaded_database_id, database_id);
308 }
309 }
310
311 Ok(fragments)
312 }
313
314 async fn load_recovery_context(
315 &self,
316 database_id: Option<DatabaseId>,
317 ) -> MetaResult<LoadedRecoveryContext> {
318 let inner = self
319 .metadata_manager
320 .catalog_controller
321 .get_inner_read_guard()
322 .await;
323 let txn = inner.db.begin().await?;
324
325 let fragment_context = self
326 .metadata_manager
327 .catalog_controller
328 .load_fragment_context_in_txn(&txn, database_id)
329 .await
330 .inspect_err(|err| {
331 warn!(error = %err.as_report(), "load fragment context failed");
332 })?;
333
334 if fragment_context.is_empty() {
335 return Ok(LoadedRecoveryContext::empty(fragment_context));
336 }
337
338 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
339 let job_extra_info = self
340 .metadata_manager
341 .catalog_controller
342 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
343 .await?;
344
345 let mut upstream_targets = HashMap::new();
346 for fragment in fragment_context.fragment_map.values() {
347 let mut has_upstream_union = false;
348 visit_stream_node_cont(&fragment.stream_node.to_protobuf(), |node| {
349 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
350 has_upstream_union = true;
351 false
352 } else {
353 true
354 }
355 });
356
357 if has_upstream_union
358 && let Some(previous) =
359 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
360 {
361 bail!(
362 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
363 fragment.job_id,
364 fragment.fragment_id,
365 previous
366 );
367 }
368 }
369
370 let mut upstream_sink_recovery = HashMap::new();
371 if !upstream_targets.is_empty() {
372 let tables = self
373 .metadata_manager
374 .catalog_controller
375 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
376 .await?;
377
378 for table in tables {
379 let job_id = table.id.as_job_id();
380 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
381 tracing::debug!(
383 job_id = %job_id,
384 "upstream sink union target fragment not found for table"
385 );
386 continue;
387 };
388
389 let upstream_infos = self
390 .metadata_manager
391 .catalog_controller
392 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
393 .await?;
394
395 upstream_sink_recovery.insert(
396 job_id,
397 UpstreamSinkRecoveryInfo {
398 target_fragment_id: *target_fragment_id,
399 upstream_infos,
400 },
401 );
402 }
403 }
404
405 let fragment_relations = self
406 .metadata_manager
407 .catalog_controller
408 .get_fragment_downstream_relations_in_txn(
409 &txn,
410 fragment_context.fragment_map.keys().copied().collect_vec(),
411 )
412 .await?;
413
414 Ok(LoadedRecoveryContext {
415 fragment_context,
416 job_extra_info,
417 upstream_sink_recovery,
418 fragment_relations,
419 })
420 }
421
422 #[expect(clippy::type_complexity)]
423 fn resolve_hummock_version_epochs(
424 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
425 version: &HummockVersion,
426 ) -> MetaResult<(
427 HashMap<TableId, u64>,
428 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
429 )> {
430 let table_committed_epoch: HashMap<_, _> = version
431 .state_table_info
432 .info()
433 .iter()
434 .map(|(table_id, info)| (*table_id, info.committed_epoch))
435 .collect();
436 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
437 Ok(*table_committed_epoch
438 .get(&table_id)
439 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
440 };
441 let mut min_downstream_committed_epochs = HashMap::new();
442 for (job_id, fragments) in background_jobs {
443 let job_committed_epoch = {
444 let mut table_id_iter =
445 InflightFragmentInfo::existing_table_ids(fragments.values());
446 let Some(first_table_id) = table_id_iter.next() else {
447 bail!("job {} has no state table", job_id);
448 };
449 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
450 for table_id in table_id_iter {
451 let table_committed_epoch = get_table_committed_epoch(table_id)?;
452 if job_committed_epoch != table_committed_epoch {
453 bail!(
454 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
455 first_table_id,
456 job_committed_epoch,
457 table_id,
458 table_committed_epoch,
459 job_id
460 );
461 }
462 }
463
464 job_committed_epoch
465 };
466 if let (Some(snapshot_backfill_info), _) =
467 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
468 fragments
469 .values()
470 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
471 )?
472 {
473 for (upstream_table, snapshot_epoch) in
474 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
475 {
476 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
477 anyhow!(
478 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
479 job_id, upstream_table
480 )
481 })?;
482 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
483 match min_downstream_committed_epochs.entry(upstream_table) {
484 Entry::Occupied(entry) => {
485 let prev_min_epoch = entry.into_mut();
486 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
487 }
488 Entry::Vacant(entry) => {
489 entry.insert(pinned_epoch);
490 }
491 }
492 }
493 }
494 }
495 let mut log_epochs = HashMap::new();
496 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
497 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
498 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
499 Ordering::Less => {
500 bail!(
501 "downstream epoch {} later than upstream epoch {} of table {}",
502 downstream_committed_epoch,
503 upstream_committed_epoch,
504 upstream_table_id
505 );
506 }
507 Ordering::Equal => {
508 continue;
509 }
510 Ordering::Greater => {
511 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
512 {
513 let epochs = table_change_log
514 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
515 .map(|epoch_log| {
516 (
517 epoch_log.non_checkpoint_epochs.clone(),
518 epoch_log.checkpoint_epoch,
519 )
520 })
521 .collect_vec();
522 let first_epochs = epochs.first();
523 if let Some((_, first_checkpoint_epoch)) = &first_epochs
524 && *first_checkpoint_epoch == downstream_committed_epoch
525 {
526 } else {
527 bail!(
528 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
529 epochs,
530 upstream_table_id,
531 downstream_committed_epoch
532 );
533 }
534 log_epochs
535 .try_insert(upstream_table_id, epochs)
536 .expect("non-duplicated");
537 } else {
538 bail!(
539 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
540 upstream_table_id,
541 upstream_committed_epoch,
542 downstream_committed_epoch
543 );
544 }
545 }
546 }
547 }
548 Ok((table_committed_epoch, log_epochs))
549 }
550
551 pub(super) async fn reload_runtime_info_impl(
552 &self,
553 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
554 {
555 {
556 {
557 self.clean_dirty_streaming_jobs(None)
558 .await
559 .context("clean dirty streaming jobs")?;
560
561 self.reset_sink_coordinator(None)
562 .await
563 .context("reset sink coordinator")?;
564 self.abort_dirty_pending_sink_state(None)
565 .await
566 .context("abort dirty pending sink state")?;
567
568 tracing::info!("recovering background job progress");
570 let initial_background_jobs = self
571 .list_background_job_progress(None)
572 .await
573 .context("recover background job progress should not fail")?;
574
575 tracing::info!("recovered background job progress");
576
577 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
579 self.metadata_manager
580 .catalog_controller
581 .cleanup_dropped_tables()
582 .await;
583
584 let adaptive_parallelism_strategy = {
585 let system_params_reader = self
586 .metadata_manager
587 .catalog_controller
588 .env
589 .system_params_reader()
590 .await;
591 system_params_reader.adaptive_parallelism_strategy()
592 };
593
594 let active_streaming_nodes =
595 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
596 .await?;
597
598 let background_streaming_jobs =
599 initial_background_jobs.iter().cloned().collect_vec();
600
601 tracing::info!(
602 "background streaming jobs: {:?} total {}",
603 background_streaming_jobs,
604 background_streaming_jobs.len()
605 );
606
607 let unreschedulable_jobs = {
608 let mut unreschedulable_jobs = HashSet::new();
609
610 for job_id in background_streaming_jobs {
611 let scan_types = self
612 .metadata_manager
613 .get_job_backfill_scan_types(job_id)
614 .await?;
615
616 if scan_types
617 .values()
618 .any(|scan_type| !scan_type.is_reschedulable())
619 {
620 unreschedulable_jobs.insert(job_id);
621 }
622 }
623
624 unreschedulable_jobs
625 };
626
627 if !unreschedulable_jobs.is_empty() {
628 info!(
629 "unreschedulable background jobs: {:?}",
630 unreschedulable_jobs
631 );
632 }
633
634 if !unreschedulable_jobs.is_empty() {
638 bail!(
639 "Recovery for unreschedulable background jobs is not yet implemented. \
640 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
641 unreschedulable_jobs
642 );
643 }
644
645 info!("trigger offline re-rendering");
646 let mut recovery_context = self.load_recovery_context(None).await?;
647
648 let mut info = self
649 .render_actor_assignments(
650 None,
651 &recovery_context.fragment_context,
652 &active_streaming_nodes,
653 adaptive_parallelism_strategy,
654 )
655 .inspect_err(|err| {
656 warn!(error = %err.as_report(), "render actor assignments failed");
657 })?;
658
659 info!("offline re-rendering completed");
660
661 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
662 if !dropped_table_ids.is_empty() {
663 self.metadata_manager
664 .catalog_controller
665 .complete_dropped_tables(dropped_table_ids)
666 .await;
667 recovery_context = self.load_recovery_context(None).await?;
668 info = self
669 .render_actor_assignments(
670 None,
671 &recovery_context.fragment_context,
672 &active_streaming_nodes,
673 adaptive_parallelism_strategy,
674 )
675 .inspect_err(|err| {
676 warn!(error = %err.as_report(), "render actor assignments failed");
677 })?
678 }
679
680 recovery_table_with_upstream_sinks(
681 &mut info,
682 &recovery_context.upstream_sink_recovery,
683 )?;
684
685 let info = info;
686
687 self.purge_state_table_from_hummock(
688 &info
689 .values()
690 .flatten()
691 .flat_map(|(_, fragments)| {
692 InflightFragmentInfo::existing_table_ids(fragments.values())
693 })
694 .collect(),
695 )
696 .await
697 .context("purge state table from hummock")?;
698
699 let (state_table_committed_epochs, state_table_log_epochs) = self
700 .hummock_manager
701 .on_current_version(|version| {
702 Self::resolve_hummock_version_epochs(
703 info.values().flat_map(|jobs| {
704 jobs.iter().filter_map(|(job_id, job)| {
705 initial_background_jobs
706 .contains(job_id)
707 .then_some((*job_id, job))
708 })
709 }),
710 version,
711 )
712 })
713 .await?;
714
715 let mv_depended_subscriptions = self
716 .metadata_manager
717 .get_mv_depended_subscriptions(None)
718 .await?;
719
720 let stream_actors =
721 build_stream_actors(&info, &recovery_context.job_extra_info)?;
722
723 let fragment_relations = recovery_context.fragment_relations;
724
725 let background_jobs = {
727 let mut refreshed_background_jobs = self
728 .list_background_job_progress(None)
729 .await
730 .context("recover background job progress should not fail")?;
731 info.values()
732 .flatten()
733 .filter_map(|(job_id, _)| {
734 refreshed_background_jobs.remove(job_id).then_some(*job_id)
735 })
736 .collect()
737 };
738
739 let database_infos = self
740 .metadata_manager
741 .catalog_controller
742 .list_databases()
743 .await?;
744
745 let mut source_splits = HashMap::new();
747 for (_, fragment_infos) in info.values().flatten() {
748 for fragment in fragment_infos.values() {
749 for (actor_id, info) in &fragment.actors {
750 source_splits.insert(*actor_id, info.splits.clone());
751 }
752 }
753 }
754
755 let cdc_table_snapshot_splits =
756 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
757 .await?;
758
759 Ok(BarrierWorkerRuntimeInfoSnapshot {
760 active_streaming_nodes,
761 database_job_infos: info,
762 state_table_committed_epochs,
763 state_table_log_epochs,
764 mv_depended_subscriptions,
765 stream_actors,
766 fragment_relations,
767 source_splits,
768 background_jobs,
769 hummock_version_stats: self.hummock_manager.get_version_stats().await,
770 database_infos,
771 cdc_table_snapshot_splits,
772 })
773 }
774 }
775 }
776 }
777
778 pub(super) async fn reload_database_runtime_info_impl(
779 &self,
780 database_id: DatabaseId,
781 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
782 self.clean_dirty_streaming_jobs(Some(database_id))
783 .await
784 .context("clean dirty streaming jobs")?;
785
786 self.reset_sink_coordinator(Some(database_id))
787 .await
788 .context("reset sink coordinator")?;
789 self.abort_dirty_pending_sink_state(Some(database_id))
790 .await
791 .context("abort dirty pending sink state")?;
792
793 tracing::info!(
795 ?database_id,
796 "recovering background job progress of database"
797 );
798
799 let background_jobs = self
800 .list_background_job_progress(Some(database_id))
801 .await
802 .context("recover background job progress of database should not fail")?;
803 tracing::info!(?database_id, "recovered background job progress");
804
805 let dropped_table_ids = self
807 .scheduled_barriers
808 .pre_apply_drop_cancel(Some(database_id));
809 self.metadata_manager
810 .catalog_controller
811 .complete_dropped_tables(dropped_table_ids)
812 .await;
813
814 let adaptive_parallelism_strategy = {
815 let system_params_reader = self
816 .metadata_manager
817 .catalog_controller
818 .env
819 .system_params_reader()
820 .await;
821 system_params_reader.adaptive_parallelism_strategy()
822 };
823
824 let active_streaming_nodes =
825 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
826
827 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
828
829 let mut all_info = self
830 .render_actor_assignments(
831 Some(database_id),
832 &recovery_context.fragment_context,
833 &active_streaming_nodes,
834 adaptive_parallelism_strategy,
835 )
836 .inspect_err(|err| {
837 warn!(error = %err.as_report(), "render actor assignments failed");
838 })?;
839
840 let mut database_info = all_info
841 .remove(&database_id)
842 .map_or_else(HashMap::new, |table_map| {
843 HashMap::from([(database_id, table_map)])
844 });
845
846 recovery_table_with_upstream_sinks(
847 &mut database_info,
848 &recovery_context.upstream_sink_recovery,
849 )?;
850
851 assert!(database_info.len() <= 1);
852
853 let stream_actors = build_stream_actors(&database_info, &recovery_context.job_extra_info)?;
854
855 let Some(info) = database_info
856 .into_iter()
857 .next()
858 .map(|(loaded_database_id, info)| {
859 assert_eq!(loaded_database_id, database_id);
860 info
861 })
862 else {
863 return Ok(None);
864 };
865
866 let missing_background_jobs = background_jobs
867 .iter()
868 .filter(|job_id| !info.contains_key(job_id))
869 .copied()
870 .collect_vec();
871 if !missing_background_jobs.is_empty() {
872 warn!(
873 database_id = %database_id,
874 missing_job_ids = ?missing_background_jobs,
875 "background jobs missing in rendered info"
876 );
877 }
878
879 let (state_table_committed_epochs, state_table_log_epochs) = self
880 .hummock_manager
881 .on_current_version(|version| {
882 Self::resolve_hummock_version_epochs(
883 background_jobs
884 .iter()
885 .filter_map(|job_id| info.get(job_id).map(|job| (*job_id, job))),
886 version,
887 )
888 })
889 .await?;
890
891 let mv_depended_subscriptions = self
892 .metadata_manager
893 .get_mv_depended_subscriptions(Some(database_id))
894 .await?;
895
896 let fragment_relations = recovery_context.fragment_relations;
897
898 let mut source_splits = HashMap::new();
900 for (_, fragment) in info.values().flatten() {
901 for (actor_id, info) in &fragment.actors {
902 source_splits.insert(*actor_id, info.splits.clone());
903 }
904 }
905
906 let cdc_table_snapshot_splits =
907 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
908 .await?;
909
910 self.refresh_manager
911 .remove_trackers_by_database(database_id);
912
913 Ok(Some(DatabaseRuntimeInfoSnapshot {
914 job_infos: info,
915 state_table_committed_epochs,
916 state_table_log_epochs,
917 mv_depended_subscriptions,
918 stream_actors,
919 fragment_relations,
920 source_splits,
921 background_jobs,
922 cdc_table_snapshot_splits,
923 }))
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use std::collections::HashMap;
930
931 use risingwave_common::catalog::FragmentTypeMask;
932 use risingwave_common::id::WorkerId;
933 use risingwave_meta_model::DispatcherType;
934 use risingwave_meta_model::fragment::DistributionType;
935 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
936 use risingwave_pb::stream_plan::{
937 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
938 };
939
940 use super::*;
941 use crate::controller::fragment::InflightActorInfo;
942 use crate::model::DownstreamFragmentRelation;
943 use crate::stream::UpstreamSinkInfo;
944
945 #[test]
946 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
947 let database_id = DatabaseId::new(1);
948 let job_id = JobId::new(10);
949 let fragment_id = FragmentId::new(100);
950 let sink_fragment_id = FragmentId::new(200);
951
952 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
953 let fragment = InflightFragmentInfo {
954 fragment_id,
955 distribution_type: DistributionType::Hash,
956 fragment_type_mask: FragmentTypeMask::empty(),
957 vnode_count: 1,
958 nodes: PbStreamNode {
959 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
960 PbUpstreamSinkUnionNode {
961 init_upstreams: vec![],
962 },
963 ))),
964 ..Default::default()
965 },
966 actors: HashMap::new(),
967 state_table_ids: HashSet::new(),
968 };
969
970 inflight_jobs
971 .entry(database_id)
972 .or_default()
973 .entry(job_id)
974 .or_default()
975 .insert(fragment_id, fragment);
976
977 let upstream_sink_recovery = HashMap::from([(
978 job_id,
979 UpstreamSinkRecoveryInfo {
980 target_fragment_id: fragment_id,
981 upstream_infos: vec![UpstreamSinkInfo {
982 sink_id: SinkId::new(1),
983 sink_fragment_id,
984 sink_output_fields: vec![],
985 sink_original_target_columns: vec![],
986 project_exprs: vec![],
987 new_sink_downstream: DownstreamFragmentRelation {
988 downstream_fragment_id: FragmentId::new(300),
989 dispatcher_type: DispatcherType::Hash,
990 dist_key_indices: vec![],
991 output_mapping: PbDispatchOutputMapping::default(),
992 },
993 }],
994 },
995 )]);
996
997 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
998
999 let updated = inflight_jobs
1000 .get(&database_id)
1001 .unwrap()
1002 .get(&job_id)
1003 .unwrap()
1004 .get(&fragment_id)
1005 .unwrap();
1006
1007 let PbNodeBody::UpstreamSinkUnion(updated_union) =
1008 updated.nodes.node_body.as_ref().unwrap()
1009 else {
1010 panic!("expected upstream sink union node");
1011 };
1012
1013 assert_eq!(updated_union.init_upstreams.len(), 1);
1014 assert_eq!(
1015 updated_union.init_upstreams[0].upstream_fragment_id,
1016 sink_fragment_id.as_raw_id()
1017 );
1018 }
1019
1020 #[test]
1021 fn test_build_stream_actors_uses_preloaded_extra_info() {
1022 let database_id = DatabaseId::new(2);
1023 let job_id = JobId::new(20);
1024 let fragment_id = FragmentId::new(120);
1025 let actor_id = ActorId::new(500);
1026
1027 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1028 inflight_jobs
1029 .entry(database_id)
1030 .or_default()
1031 .entry(job_id)
1032 .or_default()
1033 .insert(
1034 fragment_id,
1035 InflightFragmentInfo {
1036 fragment_id,
1037 distribution_type: DistributionType::Hash,
1038 fragment_type_mask: FragmentTypeMask::empty(),
1039 vnode_count: 1,
1040 nodes: PbStreamNode::default(),
1041 actors: HashMap::from([(
1042 actor_id,
1043 InflightActorInfo {
1044 worker_id: WorkerId::new(1),
1045 vnode_bitmap: None,
1046 splits: vec![],
1047 },
1048 )]),
1049 state_table_ids: HashSet::new(),
1050 },
1051 );
1052
1053 let job_extra_info = HashMap::from([(
1054 job_id,
1055 StreamingJobExtraInfo {
1056 timezone: Some("UTC".to_owned()),
1057 config_override: "cfg".into(),
1058 job_definition: "definition".to_owned(),
1059 },
1060 )]);
1061
1062 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1063
1064 let actor = stream_actors.get(&actor_id).unwrap();
1065 assert_eq!(actor.actor_id, actor_id);
1066 assert_eq!(actor.fragment_id, fragment_id);
1067 assert_eq!(actor.mview_definition, "definition");
1068 assert_eq!(&*actor.config_override, "cfg");
1069 let expr_ctx = actor.expr_context.as_ref().unwrap();
1070 assert_eq!(expr_ctx.time_zone, "UTC");
1071 }
1072}