1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::controller::utils::StreamingJobExtraInfo;
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
46use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
47use crate::stream::cdc::reload_cdc_table_snapshot_splits;
48use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
49
50struct UpstreamSinkRecoveryInfo {
51 target_fragment_id: FragmentId,
52 upstream_infos: Vec<UpstreamSinkInfo>,
53}
54
55struct LoadedRecoveryContext {
56 fragment_context: LoadedFragmentContext,
57 job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
58 upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
59 fragment_relations: FragmentDownstreamRelation,
60}
61
62impl LoadedRecoveryContext {
63 fn empty(fragment_context: LoadedFragmentContext) -> Self {
64 Self {
65 fragment_context,
66 job_extra_info: HashMap::new(),
67 upstream_sink_recovery: HashMap::new(),
68 fragment_relations: FragmentDownstreamRelation::default(),
69 }
70 }
71}
72
73fn recovery_table_with_upstream_sinks(
78 inflight_jobs: &mut FragmentRenderMap,
79 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
80) -> MetaResult<()> {
81 if upstream_sink_recovery.is_empty() {
82 return Ok(());
83 }
84
85 let mut seen_jobs = HashSet::new();
86
87 for jobs in inflight_jobs.values_mut() {
88 for (job_id, fragments) in jobs {
89 if !seen_jobs.insert(*job_id) {
90 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
91 }
92
93 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
94 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
95 refill_upstream_sink_union_in_table(
96 &mut target_fragment.nodes,
97 &recovery.upstream_infos,
98 );
99 } else {
100 return Err(anyhow::anyhow!(
101 "target fragment {} not found for upstream sink recovery of job {}",
102 recovery.target_fragment_id,
103 job_id
104 )
105 .into());
106 }
107 }
108 }
109 }
110
111 Ok(())
112}
113
114fn build_stream_actors(
120 all_info: &FragmentRenderMap,
121 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
122) -> MetaResult<HashMap<ActorId, StreamActor>> {
123 let mut stream_actors = HashMap::new();
124
125 for (job_id, streaming_info) in all_info.values().flatten() {
126 let extra_info = job_extra_info
127 .get(job_id)
128 .cloned()
129 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
130 let expr_context = extra_info.stream_context().to_expr_context();
131 let job_definition = extra_info.job_definition;
132 let config_override = extra_info.config_override;
133
134 for (fragment_id, fragment_infos) in streaming_info {
135 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
136 stream_actors.insert(
137 *actor_id,
138 StreamActor {
139 actor_id: *actor_id,
140 fragment_id: *fragment_id,
141 vnode_bitmap: vnode_bitmap.clone(),
142 mview_definition: job_definition.clone(),
143 expr_context: Some(expr_context.clone()),
144 config_override: config_override.clone(),
145 },
146 );
147 }
148 }
149 }
150 Ok(stream_actors)
151}
152
153impl GlobalBarrierWorkerContextImpl {
154 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
156 self.metadata_manager
157 .catalog_controller
158 .clean_dirty_subscription(database_id)
159 .await?;
160 let dirty_associated_source_ids = self
161 .metadata_manager
162 .catalog_controller
163 .clean_dirty_creating_jobs(database_id)
164 .await?;
165 self.metadata_manager
166 .reset_all_refresh_jobs_to_idle()
167 .await?;
168
169 self.source_manager
171 .apply_source_change(SourceChange::DropSource {
172 dropped_source_ids: dirty_associated_source_ids,
173 })
174 .await;
175
176 Ok(())
177 }
178
179 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
180 if let Some(database_id) = database_id {
181 let sink_ids = self
182 .metadata_manager
183 .catalog_controller
184 .list_sink_ids(Some(database_id))
185 .await?;
186 self.sink_manager.stop_sink_coordinator(sink_ids).await;
187 } else {
188 self.sink_manager.reset().await;
189 }
190 Ok(())
191 }
192
193 async fn abort_dirty_pending_sink_state(
194 &self,
195 database_id: Option<DatabaseId>,
196 ) -> MetaResult<()> {
197 let pending_sinks: HashSet<SinkId> = self
198 .metadata_manager
199 .catalog_controller
200 .list_all_pending_sinks(database_id)
201 .await?;
202
203 if pending_sinks.is_empty() {
204 return Ok(());
205 }
206
207 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
208 .metadata_manager
209 .catalog_controller
210 .fetch_sink_with_state_table_ids(pending_sinks)
211 .await?;
212
213 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
214
215 for (sink_id, table_ids) in sink_with_state_tables {
216 let Some(table_id) = table_ids.first() else {
217 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
218 };
219
220 self.hummock_manager
221 .on_current_version(|version| -> MetaResult<()> {
222 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
223 assert!(
224 sink_committed_epoch
225 .insert(sink_id, committed_epoch)
226 .is_none()
227 );
228 Ok(())
229 } else {
230 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
231 }
232 })
233 .await?;
234 }
235
236 self.metadata_manager
237 .catalog_controller
238 .abort_pending_sink_epochs(sink_committed_epoch)
239 .await?;
240
241 Ok(())
242 }
243
244 async fn purge_state_table_from_hummock(
245 &self,
246 all_state_table_ids: &HashSet<TableId>,
247 ) -> MetaResult<()> {
248 self.hummock_manager.purge(all_state_table_ids).await?;
249 Ok(())
250 }
251
252 async fn list_background_job_progress(
253 &self,
254 database_id: Option<DatabaseId>,
255 ) -> MetaResult<HashMap<JobId, String>> {
256 let mgr = &self.metadata_manager;
257 let job_info = mgr
258 .catalog_controller
259 .list_background_creating_jobs(false, database_id)
260 .await?;
261
262 Ok(job_info
263 .into_iter()
264 .map(|(job_id, definition, _init_at)| (job_id, definition))
265 .collect())
266 }
267
268 fn render_actor_assignments(
270 &self,
271 database_id: Option<DatabaseId>,
272 loaded: &LoadedFragmentContext,
273 worker_nodes: &ActiveStreamingWorkerNodes,
274 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
275 ) -> MetaResult<FragmentRenderMap> {
276 if loaded.is_empty() {
277 return Ok(HashMap::new());
278 }
279
280 let available_workers: BTreeMap<_, _> = worker_nodes
281 .current()
282 .values()
283 .filter(|worker| worker.is_streaming_schedulable())
284 .map(|worker| {
285 (
286 worker.id,
287 WorkerInfo {
288 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
289 resource_group: worker.resource_group(),
290 },
291 )
292 })
293 .collect();
294
295 let RenderedGraph { fragments, .. } = render_actor_assignments(
296 self.metadata_manager
297 .catalog_controller
298 .env
299 .actor_id_generator(),
300 &available_workers,
301 adaptive_parallelism_strategy,
302 loaded,
303 )?;
304
305 if let Some(database_id) = database_id {
306 for loaded_database_id in fragments.keys() {
307 assert_eq!(*loaded_database_id, database_id);
308 }
309 }
310
311 Ok(fragments)
312 }
313
314 async fn load_recovery_context(
315 &self,
316 database_id: Option<DatabaseId>,
317 ) -> MetaResult<LoadedRecoveryContext> {
318 let inner = self
319 .metadata_manager
320 .catalog_controller
321 .get_inner_read_guard()
322 .await;
323 let txn = inner.db.begin().await?;
324
325 let fragment_context = self
326 .metadata_manager
327 .catalog_controller
328 .load_fragment_context_in_txn(&txn, database_id)
329 .await
330 .inspect_err(|err| {
331 warn!(error = %err.as_report(), "load fragment context failed");
332 })?;
333
334 if fragment_context.is_empty() {
335 return Ok(LoadedRecoveryContext::empty(fragment_context));
336 }
337
338 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
339 let job_extra_info = self
340 .metadata_manager
341 .catalog_controller
342 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
343 .await?;
344
345 let mut upstream_targets = HashMap::new();
346 for fragment in fragment_context.fragment_map.values() {
347 let mut has_upstream_union = false;
348 visit_stream_node_cont(&fragment.stream_node.to_protobuf(), |node| {
349 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
350 has_upstream_union = true;
351 false
352 } else {
353 true
354 }
355 });
356
357 if has_upstream_union
358 && let Some(previous) =
359 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
360 {
361 bail!(
362 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
363 fragment.job_id,
364 fragment.fragment_id,
365 previous
366 );
367 }
368 }
369
370 let mut upstream_sink_recovery = HashMap::new();
371 if !upstream_targets.is_empty() {
372 let tables = self
373 .metadata_manager
374 .catalog_controller
375 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
376 .await?;
377
378 for table in tables {
379 let job_id = table.id.as_job_id();
380 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
381 tracing::debug!(
383 job_id = %job_id,
384 "upstream sink union target fragment not found for table"
385 );
386 continue;
387 };
388
389 let upstream_infos = self
390 .metadata_manager
391 .catalog_controller
392 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
393 .await?;
394
395 upstream_sink_recovery.insert(
396 job_id,
397 UpstreamSinkRecoveryInfo {
398 target_fragment_id: *target_fragment_id,
399 upstream_infos,
400 },
401 );
402 }
403 }
404
405 let fragment_relations = self
406 .metadata_manager
407 .catalog_controller
408 .get_fragment_downstream_relations_in_txn(
409 &txn,
410 fragment_context.fragment_map.keys().copied().collect_vec(),
411 )
412 .await?;
413
414 Ok(LoadedRecoveryContext {
415 fragment_context,
416 job_extra_info,
417 upstream_sink_recovery,
418 fragment_relations,
419 })
420 }
421
422 #[expect(clippy::type_complexity)]
423 fn resolve_hummock_version_epochs(
424 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
425 version: &HummockVersion,
426 ) -> MetaResult<(
427 HashMap<TableId, u64>,
428 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
429 )> {
430 let table_committed_epoch: HashMap<_, _> = version
431 .state_table_info
432 .info()
433 .iter()
434 .map(|(table_id, info)| (*table_id, info.committed_epoch))
435 .collect();
436 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
437 Ok(*table_committed_epoch
438 .get(&table_id)
439 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
440 };
441 let mut min_downstream_committed_epochs = HashMap::new();
442 for (job_id, fragments) in background_jobs {
443 let job_committed_epoch = {
444 let mut table_id_iter =
445 InflightFragmentInfo::existing_table_ids(fragments.values());
446 let Some(first_table_id) = table_id_iter.next() else {
447 bail!("job {} has no state table", job_id);
448 };
449 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
450 for table_id in table_id_iter {
451 let table_committed_epoch = get_table_committed_epoch(table_id)?;
452 if job_committed_epoch != table_committed_epoch {
453 bail!(
454 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
455 first_table_id,
456 job_committed_epoch,
457 table_id,
458 table_committed_epoch,
459 job_id
460 );
461 }
462 }
463
464 job_committed_epoch
465 };
466 if let (Some(snapshot_backfill_info), _) =
467 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
468 fragments
469 .values()
470 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
471 )?
472 {
473 for (upstream_table, snapshot_epoch) in
474 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
475 {
476 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
477 anyhow!(
478 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
479 job_id, upstream_table
480 )
481 })?;
482 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
483 match min_downstream_committed_epochs.entry(upstream_table) {
484 Entry::Occupied(entry) => {
485 let prev_min_epoch = entry.into_mut();
486 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
487 }
488 Entry::Vacant(entry) => {
489 entry.insert(pinned_epoch);
490 }
491 }
492 }
493 }
494 }
495 let mut log_epochs = HashMap::new();
496 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
497 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
498 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
499 Ordering::Less => {
500 bail!(
501 "downstream epoch {} later than upstream epoch {} of table {}",
502 downstream_committed_epoch,
503 upstream_committed_epoch,
504 upstream_table_id
505 );
506 }
507 Ordering::Equal => {
508 continue;
509 }
510 Ordering::Greater => {
511 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
512 {
513 let epochs = table_change_log
514 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
515 .map(|epoch_log| {
516 (
517 epoch_log.non_checkpoint_epochs.clone(),
518 epoch_log.checkpoint_epoch,
519 )
520 })
521 .collect_vec();
522 let first_epochs = epochs.first();
523 if let Some((_, first_checkpoint_epoch)) = &first_epochs
524 && *first_checkpoint_epoch == downstream_committed_epoch
525 {
526 } else {
527 bail!(
528 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
529 epochs,
530 upstream_table_id,
531 downstream_committed_epoch
532 );
533 }
534 log_epochs
535 .try_insert(upstream_table_id, epochs)
536 .expect("non-duplicated");
537 } else {
538 bail!(
539 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
540 upstream_table_id,
541 upstream_committed_epoch,
542 downstream_committed_epoch
543 );
544 }
545 }
546 }
547 }
548 Ok((table_committed_epoch, log_epochs))
549 }
550
551 pub(super) async fn reload_runtime_info_impl(
552 &self,
553 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
554 {
555 {
556 {
557 self.clean_dirty_streaming_jobs(None)
558 .await
559 .context("clean dirty streaming jobs")?;
560
561 self.reset_sink_coordinator(None)
562 .await
563 .context("reset sink coordinator")?;
564 self.abort_dirty_pending_sink_state(None)
565 .await
566 .context("abort dirty pending sink state")?;
567
568 tracing::info!("recovering background job progress");
570 let initial_background_jobs = self
571 .list_background_job_progress(None)
572 .await
573 .context("recover background job progress should not fail")?;
574
575 tracing::info!("recovered background job progress");
576
577 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
579 self.metadata_manager
580 .catalog_controller
581 .cleanup_dropped_tables()
582 .await;
583
584 let adaptive_parallelism_strategy = {
585 let system_params_reader = self
586 .metadata_manager
587 .catalog_controller
588 .env
589 .system_params_reader()
590 .await;
591 system_params_reader.adaptive_parallelism_strategy()
592 };
593
594 let active_streaming_nodes =
595 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
596 .await?;
597
598 let background_streaming_jobs =
599 initial_background_jobs.keys().cloned().collect_vec();
600
601 tracing::info!(
602 "background streaming jobs: {:?} total {}",
603 background_streaming_jobs,
604 background_streaming_jobs.len()
605 );
606
607 let unreschedulable_jobs = {
608 let mut unreschedulable_jobs = HashSet::new();
609
610 for job_id in background_streaming_jobs {
611 let scan_types = self
612 .metadata_manager
613 .get_job_backfill_scan_types(job_id)
614 .await?;
615
616 if scan_types
617 .values()
618 .any(|scan_type| !scan_type.is_reschedulable())
619 {
620 unreschedulable_jobs.insert(job_id);
621 }
622 }
623
624 unreschedulable_jobs
625 };
626
627 if !unreschedulable_jobs.is_empty() {
628 info!(
629 "unreschedulable background jobs: {:?}",
630 unreschedulable_jobs
631 );
632 }
633
634 if !unreschedulable_jobs.is_empty() {
638 bail!(
639 "Recovery for unreschedulable background jobs is not yet implemented. \
640 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
641 unreschedulable_jobs
642 );
643 }
644
645 info!("trigger offline re-rendering");
646 let mut recovery_context = self.load_recovery_context(None).await?;
647
648 let mut info = self
649 .render_actor_assignments(
650 None,
651 &recovery_context.fragment_context,
652 &active_streaming_nodes,
653 adaptive_parallelism_strategy,
654 )
655 .inspect_err(|err| {
656 warn!(error = %err.as_report(), "render actor assignments failed");
657 })?;
658
659 info!("offline re-rendering completed");
660
661 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
662 if !dropped_table_ids.is_empty() {
663 self.metadata_manager
664 .catalog_controller
665 .complete_dropped_tables(dropped_table_ids)
666 .await;
667 recovery_context = self.load_recovery_context(None).await?;
668 info = self
669 .render_actor_assignments(
670 None,
671 &recovery_context.fragment_context,
672 &active_streaming_nodes,
673 adaptive_parallelism_strategy,
674 )
675 .inspect_err(|err| {
676 warn!(error = %err.as_report(), "render actor assignments failed");
677 })?
678 }
679
680 recovery_table_with_upstream_sinks(
681 &mut info,
682 &recovery_context.upstream_sink_recovery,
683 )?;
684
685 let info = info;
686
687 self.purge_state_table_from_hummock(
688 &info
689 .values()
690 .flatten()
691 .flat_map(|(_, fragments)| {
692 InflightFragmentInfo::existing_table_ids(fragments.values())
693 })
694 .collect(),
695 )
696 .await
697 .context("purge state table from hummock")?;
698
699 let (state_table_committed_epochs, state_table_log_epochs) = self
700 .hummock_manager
701 .on_current_version(|version| {
702 Self::resolve_hummock_version_epochs(
703 info.values().flat_map(|jobs| {
704 jobs.iter().filter_map(|(job_id, job)| {
705 initial_background_jobs
706 .contains_key(job_id)
707 .then_some((*job_id, job))
708 })
709 }),
710 version,
711 )
712 })
713 .await?;
714
715 let mv_depended_subscriptions = self
716 .metadata_manager
717 .get_mv_depended_subscriptions(None)
718 .await?;
719
720 let stream_actors =
721 build_stream_actors(&info, &recovery_context.job_extra_info)?;
722
723 let fragment_relations = recovery_context.fragment_relations;
724
725 let background_jobs = {
727 let mut refreshed_background_jobs = self
728 .list_background_job_progress(None)
729 .await
730 .context("recover background job progress should not fail")?;
731 info.values()
732 .flatten()
733 .filter_map(|(job_id, _)| {
734 refreshed_background_jobs
735 .remove(job_id)
736 .map(|definition| (*job_id, definition))
737 })
738 .collect()
739 };
740
741 let database_infos = self
742 .metadata_manager
743 .catalog_controller
744 .list_databases()
745 .await?;
746
747 let mut source_splits = HashMap::new();
749 for (_, fragment_infos) in info.values().flatten() {
750 for fragment in fragment_infos.values() {
751 for (actor_id, info) in &fragment.actors {
752 source_splits.insert(*actor_id, info.splits.clone());
753 }
754 }
755 }
756
757 let cdc_table_snapshot_splits =
758 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
759 .await?;
760
761 Ok(BarrierWorkerRuntimeInfoSnapshot {
762 active_streaming_nodes,
763 database_job_infos: info,
764 state_table_committed_epochs,
765 state_table_log_epochs,
766 mv_depended_subscriptions,
767 stream_actors,
768 fragment_relations,
769 source_splits,
770 background_jobs,
771 hummock_version_stats: self.hummock_manager.get_version_stats().await,
772 database_infos,
773 cdc_table_snapshot_splits,
774 })
775 }
776 }
777 }
778 }
779
780 pub(super) async fn reload_database_runtime_info_impl(
781 &self,
782 database_id: DatabaseId,
783 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
784 self.clean_dirty_streaming_jobs(Some(database_id))
785 .await
786 .context("clean dirty streaming jobs")?;
787
788 self.reset_sink_coordinator(Some(database_id))
789 .await
790 .context("reset sink coordinator")?;
791 self.abort_dirty_pending_sink_state(Some(database_id))
792 .await
793 .context("abort dirty pending sink state")?;
794
795 tracing::info!(
797 ?database_id,
798 "recovering background job progress of database"
799 );
800
801 let background_jobs = self
802 .list_background_job_progress(Some(database_id))
803 .await
804 .context("recover background job progress of database should not fail")?;
805 tracing::info!(?database_id, "recovered background job progress");
806
807 let dropped_table_ids = self
809 .scheduled_barriers
810 .pre_apply_drop_cancel(Some(database_id));
811 self.metadata_manager
812 .catalog_controller
813 .complete_dropped_tables(dropped_table_ids)
814 .await;
815
816 let adaptive_parallelism_strategy = {
817 let system_params_reader = self
818 .metadata_manager
819 .catalog_controller
820 .env
821 .system_params_reader()
822 .await;
823 system_params_reader.adaptive_parallelism_strategy()
824 };
825
826 let active_streaming_nodes =
827 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
828
829 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
830
831 let mut all_info = self
832 .render_actor_assignments(
833 Some(database_id),
834 &recovery_context.fragment_context,
835 &active_streaming_nodes,
836 adaptive_parallelism_strategy,
837 )
838 .inspect_err(|err| {
839 warn!(error = %err.as_report(), "render actor assignments failed");
840 })?;
841
842 let mut database_info = all_info
843 .remove(&database_id)
844 .map_or_else(HashMap::new, |table_map| {
845 HashMap::from([(database_id, table_map)])
846 });
847
848 recovery_table_with_upstream_sinks(
849 &mut database_info,
850 &recovery_context.upstream_sink_recovery,
851 )?;
852
853 assert!(database_info.len() <= 1);
854
855 let stream_actors = build_stream_actors(&database_info, &recovery_context.job_extra_info)?;
856
857 let Some(info) = database_info
858 .into_iter()
859 .next()
860 .map(|(loaded_database_id, info)| {
861 assert_eq!(loaded_database_id, database_id);
862 info
863 })
864 else {
865 return Ok(None);
866 };
867
868 let missing_background_jobs = background_jobs
869 .keys()
870 .filter(|job_id| !info.contains_key(job_id))
871 .copied()
872 .collect_vec();
873 if !missing_background_jobs.is_empty() {
874 warn!(
875 database_id = %database_id,
876 missing_job_ids = ?missing_background_jobs,
877 "background jobs missing in rendered info"
878 );
879 }
880
881 let (state_table_committed_epochs, state_table_log_epochs) = self
882 .hummock_manager
883 .on_current_version(|version| {
884 Self::resolve_hummock_version_epochs(
885 background_jobs
886 .keys()
887 .filter_map(|job_id| info.get(job_id).map(|job| (*job_id, job))),
888 version,
889 )
890 })
891 .await?;
892
893 let mv_depended_subscriptions = self
894 .metadata_manager
895 .get_mv_depended_subscriptions(Some(database_id))
896 .await?;
897
898 let fragment_relations = recovery_context.fragment_relations;
899
900 let mut source_splits = HashMap::new();
902 for (_, fragment) in info.values().flatten() {
903 for (actor_id, info) in &fragment.actors {
904 source_splits.insert(*actor_id, info.splits.clone());
905 }
906 }
907
908 let cdc_table_snapshot_splits =
909 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
910 .await?;
911
912 self.refresh_manager
913 .remove_trackers_by_database(database_id);
914
915 Ok(Some(DatabaseRuntimeInfoSnapshot {
916 job_infos: info,
917 state_table_committed_epochs,
918 state_table_log_epochs,
919 mv_depended_subscriptions,
920 stream_actors,
921 fragment_relations,
922 source_splits,
923 background_jobs,
924 cdc_table_snapshot_splits,
925 }))
926 }
927}
928
929#[cfg(test)]
930mod tests {
931 use std::collections::HashMap;
932
933 use risingwave_common::catalog::FragmentTypeMask;
934 use risingwave_common::id::WorkerId;
935 use risingwave_meta_model::DispatcherType;
936 use risingwave_meta_model::fragment::DistributionType;
937 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
938 use risingwave_pb::stream_plan::{
939 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
940 };
941
942 use super::*;
943 use crate::controller::fragment::InflightActorInfo;
944 use crate::model::DownstreamFragmentRelation;
945 use crate::stream::UpstreamSinkInfo;
946
947 #[test]
948 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
949 let database_id = DatabaseId::new(1);
950 let job_id = JobId::new(10);
951 let fragment_id = FragmentId::new(100);
952 let sink_fragment_id = FragmentId::new(200);
953
954 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
955 let fragment = InflightFragmentInfo {
956 fragment_id,
957 distribution_type: DistributionType::Hash,
958 fragment_type_mask: FragmentTypeMask::empty(),
959 vnode_count: 1,
960 nodes: PbStreamNode {
961 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
962 PbUpstreamSinkUnionNode {
963 init_upstreams: vec![],
964 },
965 ))),
966 ..Default::default()
967 },
968 actors: HashMap::new(),
969 state_table_ids: HashSet::new(),
970 };
971
972 inflight_jobs
973 .entry(database_id)
974 .or_default()
975 .entry(job_id)
976 .or_default()
977 .insert(fragment_id, fragment);
978
979 let upstream_sink_recovery = HashMap::from([(
980 job_id,
981 UpstreamSinkRecoveryInfo {
982 target_fragment_id: fragment_id,
983 upstream_infos: vec![UpstreamSinkInfo {
984 sink_id: SinkId::new(1),
985 sink_fragment_id,
986 sink_output_fields: vec![],
987 sink_original_target_columns: vec![],
988 project_exprs: vec![],
989 new_sink_downstream: DownstreamFragmentRelation {
990 downstream_fragment_id: FragmentId::new(300),
991 dispatcher_type: DispatcherType::Hash,
992 dist_key_indices: vec![],
993 output_mapping: PbDispatchOutputMapping::default(),
994 },
995 }],
996 },
997 )]);
998
999 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1000
1001 let updated = inflight_jobs
1002 .get(&database_id)
1003 .unwrap()
1004 .get(&job_id)
1005 .unwrap()
1006 .get(&fragment_id)
1007 .unwrap();
1008
1009 let PbNodeBody::UpstreamSinkUnion(updated_union) =
1010 updated.nodes.node_body.as_ref().unwrap()
1011 else {
1012 panic!("expected upstream sink union node");
1013 };
1014
1015 assert_eq!(updated_union.init_upstreams.len(), 1);
1016 assert_eq!(
1017 updated_union.init_upstreams[0].upstream_fragment_id,
1018 sink_fragment_id.as_raw_id()
1019 );
1020 }
1021
1022 #[test]
1023 fn test_build_stream_actors_uses_preloaded_extra_info() {
1024 let database_id = DatabaseId::new(2);
1025 let job_id = JobId::new(20);
1026 let fragment_id = FragmentId::new(120);
1027 let actor_id = ActorId::new(500);
1028
1029 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1030 inflight_jobs
1031 .entry(database_id)
1032 .or_default()
1033 .entry(job_id)
1034 .or_default()
1035 .insert(
1036 fragment_id,
1037 InflightFragmentInfo {
1038 fragment_id,
1039 distribution_type: DistributionType::Hash,
1040 fragment_type_mask: FragmentTypeMask::empty(),
1041 vnode_count: 1,
1042 nodes: PbStreamNode::default(),
1043 actors: HashMap::from([(
1044 actor_id,
1045 InflightActorInfo {
1046 worker_id: WorkerId::new(1),
1047 vnode_bitmap: None,
1048 splits: vec![],
1049 },
1050 )]),
1051 state_table_ids: HashSet::new(),
1052 },
1053 );
1054
1055 let job_extra_info = HashMap::from([(
1056 job_id,
1057 StreamingJobExtraInfo {
1058 timezone: Some("UTC".to_owned()),
1059 config_override: "cfg".into(),
1060 job_definition: "definition".to_owned(),
1061 },
1062 )]);
1063
1064 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1065
1066 let actor = stream_actors.get(&actor_id).unwrap();
1067 assert_eq!(actor.actor_id, actor_id);
1068 assert_eq!(actor.fragment_id, fragment_id);
1069 assert_eq!(actor.mview_definition, "definition");
1070 assert_eq!(&*actor.config_override, "cfg");
1071 let expr_ctx = actor.expr_context.as_ref().unwrap();
1072 assert_eq!(expr_ctx.time_zone, "UTC");
1073 }
1074}