1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::SplitImpl;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
42 render_actor_assignments,
43};
44use crate::controller::utils::StreamingJobExtraInfo;
45use crate::manager::ActiveStreamingWorkerNodes;
46use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
47use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
48use crate::stream::cdc::reload_cdc_table_snapshot_splits;
49use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
50
51#[derive(Debug)]
52pub(crate) struct UpstreamSinkRecoveryInfo {
53 target_fragment_id: FragmentId,
54 upstream_infos: Vec<UpstreamSinkInfo>,
55}
56
57#[derive(Debug)]
58pub struct LoadedRecoveryContext {
59 pub fragment_context: LoadedFragmentContext,
60 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
61 pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
62 pub fragment_relations: FragmentDownstreamRelation,
63}
64
65impl LoadedRecoveryContext {
66 fn empty(fragment_context: LoadedFragmentContext) -> Self {
67 Self {
68 fragment_context,
69 job_extra_info: HashMap::new(),
70 upstream_sink_recovery: HashMap::new(),
71 fragment_relations: FragmentDownstreamRelation::default(),
72 }
73 }
74}
75
76pub struct RenderedDatabaseRuntimeInfo {
77 pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
78 pub stream_actors: HashMap<ActorId, StreamActor>,
79 pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
80}
81
82pub fn render_runtime_info(
83 actor_id_generator: &AtomicU32,
84 worker_nodes: &ActiveStreamingWorkerNodes,
85 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
86 recovery_context: &LoadedRecoveryContext,
87 database_id: DatabaseId,
88) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
89 let Some(per_database_context) = recovery_context.fragment_context.for_database(database_id)
90 else {
91 return Ok(None);
92 };
93
94 assert!(!per_database_context.is_empty());
95
96 let RenderedGraph { mut fragments, .. } = render_actor_assignments(
97 actor_id_generator,
98 worker_nodes.current(),
99 adaptive_parallelism_strategy,
100 &per_database_context,
101 )?;
102
103 let single_database = match fragments.remove(&database_id) {
104 Some(info) => info,
105 None => return Ok(None),
106 };
107
108 let mut database_map = HashMap::from([(database_id, single_database)]);
109 recovery_table_with_upstream_sinks(
110 &mut database_map,
111 &recovery_context.upstream_sink_recovery,
112 )?;
113 let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
114
115 let job_infos = database_map
116 .remove(&database_id)
117 .expect("database entry must exist");
118
119 let mut source_splits = HashMap::new();
120 for fragment_infos in job_infos.values() {
121 for fragment in fragment_infos.values() {
122 for (actor_id, info) in &fragment.actors {
123 source_splits.insert(*actor_id, info.splits.clone());
124 }
125 }
126 }
127
128 Ok(Some(RenderedDatabaseRuntimeInfo {
129 job_infos,
130 stream_actors,
131 source_splits,
132 }))
133}
134
135fn recovery_table_with_upstream_sinks(
140 inflight_jobs: &mut FragmentRenderMap,
141 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
142) -> MetaResult<()> {
143 if upstream_sink_recovery.is_empty() {
144 return Ok(());
145 }
146
147 let mut seen_jobs = HashSet::new();
148
149 for jobs in inflight_jobs.values_mut() {
150 for (job_id, fragments) in jobs {
151 if !seen_jobs.insert(*job_id) {
152 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
153 }
154
155 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
156 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
157 refill_upstream_sink_union_in_table(
158 &mut target_fragment.nodes,
159 &recovery.upstream_infos,
160 );
161 } else {
162 return Err(anyhow::anyhow!(
163 "target fragment {} not found for upstream sink recovery of job {}",
164 recovery.target_fragment_id,
165 job_id
166 )
167 .into());
168 }
169 }
170 }
171 }
172
173 Ok(())
174}
175
176fn build_stream_actors(
182 all_info: &FragmentRenderMap,
183 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
184) -> MetaResult<HashMap<ActorId, StreamActor>> {
185 let mut stream_actors = HashMap::new();
186
187 for (job_id, streaming_info) in all_info.values().flatten() {
188 let extra_info = job_extra_info
189 .get(job_id)
190 .cloned()
191 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
192 let expr_context = extra_info.stream_context().to_expr_context();
193 let job_definition = extra_info.job_definition;
194 let config_override = extra_info.config_override;
195
196 for (fragment_id, fragment_infos) in streaming_info {
197 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
198 stream_actors.insert(
199 *actor_id,
200 StreamActor {
201 actor_id: *actor_id,
202 fragment_id: *fragment_id,
203 vnode_bitmap: vnode_bitmap.clone(),
204 mview_definition: job_definition.clone(),
205 expr_context: Some(expr_context.clone()),
206 config_override: config_override.clone(),
207 },
208 );
209 }
210 }
211 }
212 Ok(stream_actors)
213}
214
215impl GlobalBarrierWorkerContextImpl {
216 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
218 self.metadata_manager
219 .catalog_controller
220 .clean_dirty_subscription(database_id)
221 .await?;
222 let dirty_associated_source_ids = self
223 .metadata_manager
224 .catalog_controller
225 .clean_dirty_creating_jobs(database_id)
226 .await?;
227 self.metadata_manager
228 .reset_all_refresh_jobs_to_idle()
229 .await?;
230
231 self.source_manager
233 .apply_source_change(SourceChange::DropSource {
234 dropped_source_ids: dirty_associated_source_ids,
235 })
236 .await;
237
238 Ok(())
239 }
240
241 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
242 if let Some(database_id) = database_id {
243 let sink_ids = self
244 .metadata_manager
245 .catalog_controller
246 .list_sink_ids(Some(database_id))
247 .await?;
248 self.sink_manager.stop_sink_coordinator(sink_ids).await;
249 } else {
250 self.sink_manager.reset().await;
251 }
252 Ok(())
253 }
254
255 async fn abort_dirty_pending_sink_state(
256 &self,
257 database_id: Option<DatabaseId>,
258 ) -> MetaResult<()> {
259 let pending_sinks: HashSet<SinkId> = self
260 .metadata_manager
261 .catalog_controller
262 .list_all_pending_sinks(database_id)
263 .await?;
264
265 if pending_sinks.is_empty() {
266 return Ok(());
267 }
268
269 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
270 .metadata_manager
271 .catalog_controller
272 .fetch_sink_with_state_table_ids(pending_sinks)
273 .await?;
274
275 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
276
277 for (sink_id, table_ids) in sink_with_state_tables {
278 let Some(table_id) = table_ids.first() else {
279 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
280 };
281
282 self.hummock_manager
283 .on_current_version(|version| -> MetaResult<()> {
284 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
285 assert!(
286 sink_committed_epoch
287 .insert(sink_id, committed_epoch)
288 .is_none()
289 );
290 Ok(())
291 } else {
292 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
293 }
294 })
295 .await?;
296 }
297
298 self.metadata_manager
299 .catalog_controller
300 .abort_pending_sink_epochs(sink_committed_epoch)
301 .await?;
302
303 Ok(())
304 }
305
306 async fn purge_state_table_from_hummock(
307 &self,
308 all_state_table_ids: &HashSet<TableId>,
309 ) -> MetaResult<()> {
310 self.hummock_manager.purge(all_state_table_ids).await?;
311 Ok(())
312 }
313
314 async fn list_background_job_progress(
315 &self,
316 database_id: Option<DatabaseId>,
317 ) -> MetaResult<HashSet<JobId>> {
318 let mgr = &self.metadata_manager;
319 mgr.catalog_controller
320 .list_background_creating_jobs(false, database_id)
321 .await
322 }
323
324 async fn load_recovery_context(
325 &self,
326 database_id: Option<DatabaseId>,
327 ) -> MetaResult<LoadedRecoveryContext> {
328 let inner = self
329 .metadata_manager
330 .catalog_controller
331 .get_inner_read_guard()
332 .await;
333 let txn = inner.db.begin().await?;
334
335 let fragment_context = self
336 .metadata_manager
337 .catalog_controller
338 .load_fragment_context_in_txn(&txn, database_id)
339 .await
340 .inspect_err(|err| {
341 warn!(error = %err.as_report(), "load fragment context failed");
342 })?;
343
344 if fragment_context.is_empty() {
345 return Ok(LoadedRecoveryContext::empty(fragment_context));
346 }
347
348 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
349 let job_extra_info = self
350 .metadata_manager
351 .catalog_controller
352 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
353 .await?;
354
355 let mut upstream_targets = HashMap::new();
356 for fragment in fragment_context
357 .job_fragments
358 .values()
359 .flat_map(|fragments| fragments.values())
360 {
361 let mut has_upstream_union = false;
362 visit_stream_node_cont(&fragment.nodes, |node| {
363 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
364 has_upstream_union = true;
365 false
366 } else {
367 true
368 }
369 });
370
371 if has_upstream_union
372 && let Some(previous) =
373 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
374 {
375 bail!(
376 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
377 fragment.job_id,
378 fragment.fragment_id,
379 previous
380 );
381 }
382 }
383
384 let mut upstream_sink_recovery = HashMap::new();
385 if !upstream_targets.is_empty() {
386 let tables = self
387 .metadata_manager
388 .catalog_controller
389 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
390 .await?;
391
392 for table in tables {
393 let job_id = table.id.as_job_id();
394 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
395 tracing::debug!(
397 job_id = %job_id,
398 "upstream sink union target fragment not found for table"
399 );
400 continue;
401 };
402
403 let upstream_infos = self
404 .metadata_manager
405 .catalog_controller
406 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
407 .await?;
408
409 upstream_sink_recovery.insert(
410 job_id,
411 UpstreamSinkRecoveryInfo {
412 target_fragment_id: *target_fragment_id,
413 upstream_infos,
414 },
415 );
416 }
417 }
418
419 let fragment_relations = self
420 .metadata_manager
421 .catalog_controller
422 .get_fragment_downstream_relations_in_txn(
423 &txn,
424 fragment_context
425 .job_fragments
426 .values()
427 .flat_map(|fragments| fragments.keys().copied())
428 .collect_vec(),
429 )
430 .await?;
431
432 Ok(LoadedRecoveryContext {
433 fragment_context,
434 job_extra_info,
435 upstream_sink_recovery,
436 fragment_relations,
437 })
438 }
439
440 #[expect(clippy::type_complexity)]
441 fn resolve_hummock_version_epochs(
442 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
443 version: &HummockVersion,
444 ) -> MetaResult<(
445 HashMap<TableId, u64>,
446 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
447 )> {
448 let table_committed_epoch: HashMap<_, _> = version
449 .state_table_info
450 .info()
451 .iter()
452 .map(|(table_id, info)| (*table_id, info.committed_epoch))
453 .collect();
454 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
455 Ok(*table_committed_epoch
456 .get(&table_id)
457 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
458 };
459 let mut min_downstream_committed_epochs = HashMap::new();
460 for (job_id, fragments) in background_jobs {
461 let job_committed_epoch = {
462 let mut table_id_iter = fragments
463 .values()
464 .flat_map(|fragment| fragment.state_table_ids.iter().copied());
465 let Some(first_table_id) = table_id_iter.next() else {
466 bail!("job {} has no state table", job_id);
467 };
468 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
469 for table_id in table_id_iter {
470 let table_committed_epoch = get_table_committed_epoch(table_id)?;
471 if job_committed_epoch != table_committed_epoch {
472 bail!(
473 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
474 first_table_id,
475 job_committed_epoch,
476 table_id,
477 table_committed_epoch,
478 job_id
479 );
480 }
481 }
482
483 job_committed_epoch
484 };
485 if let (Some(snapshot_backfill_info), _) =
486 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
487 fragments
488 .values()
489 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
490 )?
491 {
492 for (upstream_table, snapshot_epoch) in
493 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
494 {
495 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
496 anyhow!(
497 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
498 job_id, upstream_table
499 )
500 })?;
501 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
502 match min_downstream_committed_epochs.entry(upstream_table) {
503 Entry::Occupied(entry) => {
504 let prev_min_epoch = entry.into_mut();
505 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
506 }
507 Entry::Vacant(entry) => {
508 entry.insert(pinned_epoch);
509 }
510 }
511 }
512 }
513 }
514 let mut log_epochs = HashMap::new();
515 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
516 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
517 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
518 Ordering::Less => {
519 bail!(
520 "downstream epoch {} later than upstream epoch {} of table {}",
521 downstream_committed_epoch,
522 upstream_committed_epoch,
523 upstream_table_id
524 );
525 }
526 Ordering::Equal => {
527 continue;
528 }
529 Ordering::Greater => {
530 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
531 {
532 let epochs = table_change_log
533 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
534 .map(|epoch_log| {
535 (
536 epoch_log.non_checkpoint_epochs.clone(),
537 epoch_log.checkpoint_epoch,
538 )
539 })
540 .collect_vec();
541 let first_epochs = epochs.first();
542 if let Some((_, first_checkpoint_epoch)) = &first_epochs
543 && *first_checkpoint_epoch == downstream_committed_epoch
544 {
545 } else {
546 bail!(
547 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
548 epochs,
549 upstream_table_id,
550 downstream_committed_epoch
551 );
552 }
553 log_epochs
554 .try_insert(upstream_table_id, epochs)
555 .expect("non-duplicated");
556 } else {
557 bail!(
558 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
559 upstream_table_id,
560 upstream_committed_epoch,
561 downstream_committed_epoch
562 );
563 }
564 }
565 }
566 }
567 Ok((table_committed_epoch, log_epochs))
568 }
569
570 pub(super) async fn reload_runtime_info_impl(
571 &self,
572 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
573 {
574 {
575 {
576 self.clean_dirty_streaming_jobs(None)
577 .await
578 .context("clean dirty streaming jobs")?;
579
580 self.reset_sink_coordinator(None)
581 .await
582 .context("reset sink coordinator")?;
583 self.abort_dirty_pending_sink_state(None)
584 .await
585 .context("abort dirty pending sink state")?;
586
587 tracing::info!("recovering background job progress");
589 let initial_background_jobs = self
590 .list_background_job_progress(None)
591 .await
592 .context("recover background job progress should not fail")?;
593
594 tracing::info!("recovered background job progress");
595
596 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
598 self.metadata_manager
599 .catalog_controller
600 .cleanup_dropped_tables()
601 .await;
602
603 let active_streaming_nodes =
604 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
605 .await?;
606
607 let background_streaming_jobs =
608 initial_background_jobs.iter().cloned().collect_vec();
609
610 tracing::info!(
611 "background streaming jobs: {:?} total {}",
612 background_streaming_jobs,
613 background_streaming_jobs.len()
614 );
615
616 let unreschedulable_jobs = {
617 let mut unreschedulable_jobs = HashSet::new();
618
619 for job_id in background_streaming_jobs {
620 let scan_types = self
621 .metadata_manager
622 .get_job_backfill_scan_types(job_id)
623 .await?;
624
625 if scan_types
626 .values()
627 .any(|scan_type| !scan_type.is_reschedulable(false))
628 {
629 unreschedulable_jobs.insert(job_id);
630 }
631 }
632
633 unreschedulable_jobs
634 };
635
636 if !unreschedulable_jobs.is_empty() {
637 info!(
638 "unreschedulable background jobs: {:?}",
639 unreschedulable_jobs
640 );
641 }
642
643 if !unreschedulable_jobs.is_empty() {
647 bail!(
648 "Recovery for unreschedulable background jobs is not yet implemented. \
649 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
650 unreschedulable_jobs
651 );
652 }
653
654 let mut recovery_context = self.load_recovery_context(None).await?;
655 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
656 if !dropped_table_ids.is_empty() {
657 self.metadata_manager
658 .catalog_controller
659 .complete_dropped_tables(dropped_table_ids)
660 .await;
661 recovery_context = self.load_recovery_context(None).await?;
662 }
663
664 self.purge_state_table_from_hummock(
665 &recovery_context
666 .fragment_context
667 .job_fragments
668 .values()
669 .flat_map(|fragments| fragments.values())
670 .flat_map(|fragment| fragment.state_table_ids.iter().copied())
671 .collect(),
672 )
673 .await
674 .context("purge state table from hummock")?;
675
676 let (state_table_committed_epochs, state_table_log_epochs) = self
677 .hummock_manager
678 .on_current_version(|version| {
679 Self::resolve_hummock_version_epochs(
680 recovery_context
681 .fragment_context
682 .job_fragments
683 .iter()
684 .filter_map(|(job_id, job)| {
685 initial_background_jobs
686 .contains(job_id)
687 .then_some((*job_id, job))
688 }),
689 version,
690 )
691 })
692 .await?;
693
694 let mv_depended_subscriptions = self
695 .metadata_manager
696 .get_mv_depended_subscriptions(None)
697 .await?;
698
699 let background_jobs = {
701 let mut refreshed_background_jobs = self
702 .list_background_job_progress(None)
703 .await
704 .context("recover background job progress should not fail")?;
705 recovery_context
706 .fragment_context
707 .job_map
708 .keys()
709 .filter_map(|job_id| {
710 refreshed_background_jobs.remove(job_id).then_some(*job_id)
711 })
712 .collect()
713 };
714
715 let database_infos = self
716 .metadata_manager
717 .catalog_controller
718 .list_databases()
719 .await?;
720
721 let cdc_table_snapshot_splits =
722 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
723 .await?;
724
725 Ok(BarrierWorkerRuntimeInfoSnapshot {
726 active_streaming_nodes,
727 recovery_context,
728 state_table_committed_epochs,
729 state_table_log_epochs,
730 mv_depended_subscriptions,
731 background_jobs,
732 hummock_version_stats: self.hummock_manager.get_version_stats().await,
733 database_infos,
734 cdc_table_snapshot_splits,
735 })
736 }
737 }
738 }
739 }
740
741 pub(super) async fn reload_database_runtime_info_impl(
742 &self,
743 database_id: DatabaseId,
744 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
745 self.clean_dirty_streaming_jobs(Some(database_id))
746 .await
747 .context("clean dirty streaming jobs")?;
748
749 self.reset_sink_coordinator(Some(database_id))
750 .await
751 .context("reset sink coordinator")?;
752 self.abort_dirty_pending_sink_state(Some(database_id))
753 .await
754 .context("abort dirty pending sink state")?;
755
756 tracing::info!(
758 ?database_id,
759 "recovering background job progress of database"
760 );
761
762 let background_jobs = self
763 .list_background_job_progress(Some(database_id))
764 .await
765 .context("recover background job progress of database should not fail")?;
766 tracing::info!(?database_id, "recovered background job progress");
767
768 let dropped_table_ids = self
770 .scheduled_barriers
771 .pre_apply_drop_cancel(Some(database_id));
772 self.metadata_manager
773 .catalog_controller
774 .complete_dropped_tables(dropped_table_ids)
775 .await;
776
777 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
778
779 let missing_background_jobs = background_jobs
780 .iter()
781 .filter(|job_id| {
782 !recovery_context
783 .fragment_context
784 .job_map
785 .contains_key(*job_id)
786 })
787 .copied()
788 .collect_vec();
789 if !missing_background_jobs.is_empty() {
790 warn!(
791 database_id = %database_id,
792 missing_job_ids = ?missing_background_jobs,
793 "background jobs missing in rendered info"
794 );
795 }
796
797 let (state_table_committed_epochs, state_table_log_epochs) = self
798 .hummock_manager
799 .on_current_version(|version| {
800 Self::resolve_hummock_version_epochs(
801 background_jobs.iter().filter_map(|job_id| {
802 recovery_context
803 .fragment_context
804 .job_fragments
805 .get(job_id)
806 .map(|job| (*job_id, job))
807 }),
808 version,
809 )
810 })
811 .await?;
812
813 let mv_depended_subscriptions = self
814 .metadata_manager
815 .get_mv_depended_subscriptions(Some(database_id))
816 .await?;
817
818 let cdc_table_snapshot_splits =
819 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
820 .await?;
821
822 self.refresh_manager
823 .remove_trackers_by_database(database_id);
824
825 Ok(DatabaseRuntimeInfoSnapshot {
826 recovery_context,
827 state_table_committed_epochs,
828 state_table_log_epochs,
829 mv_depended_subscriptions,
830 background_jobs,
831 cdc_table_snapshot_splits,
832 })
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use std::collections::HashMap;
839
840 use risingwave_common::catalog::FragmentTypeMask;
841 use risingwave_common::id::WorkerId;
842 use risingwave_meta_model::DispatcherType;
843 use risingwave_meta_model::fragment::DistributionType;
844 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
845 use risingwave_pb::stream_plan::{
846 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
847 };
848
849 use super::*;
850 use crate::controller::fragment::InflightActorInfo;
851 use crate::model::DownstreamFragmentRelation;
852 use crate::stream::UpstreamSinkInfo;
853
854 #[test]
855 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
856 let database_id = DatabaseId::new(1);
857 let job_id = JobId::new(10);
858 let fragment_id = FragmentId::new(100);
859 let sink_fragment_id = FragmentId::new(200);
860
861 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
862 let fragment = InflightFragmentInfo {
863 fragment_id,
864 distribution_type: DistributionType::Hash,
865 fragment_type_mask: FragmentTypeMask::empty(),
866 vnode_count: 1,
867 nodes: PbStreamNode {
868 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
869 PbUpstreamSinkUnionNode {
870 init_upstreams: vec![],
871 },
872 ))),
873 ..Default::default()
874 },
875 actors: HashMap::new(),
876 state_table_ids: HashSet::new(),
877 };
878
879 inflight_jobs
880 .entry(database_id)
881 .or_default()
882 .entry(job_id)
883 .or_default()
884 .insert(fragment_id, fragment);
885
886 let upstream_sink_recovery = HashMap::from([(
887 job_id,
888 UpstreamSinkRecoveryInfo {
889 target_fragment_id: fragment_id,
890 upstream_infos: vec![UpstreamSinkInfo {
891 sink_id: SinkId::new(1),
892 sink_fragment_id,
893 sink_output_fields: vec![],
894 sink_original_target_columns: vec![],
895 project_exprs: vec![],
896 new_sink_downstream: DownstreamFragmentRelation {
897 downstream_fragment_id: FragmentId::new(300),
898 dispatcher_type: DispatcherType::Hash,
899 dist_key_indices: vec![],
900 output_mapping: PbDispatchOutputMapping::default(),
901 },
902 }],
903 },
904 )]);
905
906 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
907
908 let updated = inflight_jobs
909 .get(&database_id)
910 .unwrap()
911 .get(&job_id)
912 .unwrap()
913 .get(&fragment_id)
914 .unwrap();
915
916 let PbNodeBody::UpstreamSinkUnion(updated_union) =
917 updated.nodes.node_body.as_ref().unwrap()
918 else {
919 panic!("expected upstream sink union node");
920 };
921
922 assert_eq!(updated_union.init_upstreams.len(), 1);
923 assert_eq!(
924 updated_union.init_upstreams[0].upstream_fragment_id,
925 sink_fragment_id.as_raw_id()
926 );
927 }
928
929 #[test]
930 fn test_build_stream_actors_uses_preloaded_extra_info() {
931 let database_id = DatabaseId::new(2);
932 let job_id = JobId::new(20);
933 let fragment_id = FragmentId::new(120);
934 let actor_id = ActorId::new(500);
935
936 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
937 inflight_jobs
938 .entry(database_id)
939 .or_default()
940 .entry(job_id)
941 .or_default()
942 .insert(
943 fragment_id,
944 InflightFragmentInfo {
945 fragment_id,
946 distribution_type: DistributionType::Hash,
947 fragment_type_mask: FragmentTypeMask::empty(),
948 vnode_count: 1,
949 nodes: PbStreamNode::default(),
950 actors: HashMap::from([(
951 actor_id,
952 InflightActorInfo {
953 worker_id: WorkerId::new(1),
954 vnode_bitmap: None,
955 splits: vec![],
956 },
957 )]),
958 state_table_ids: HashSet::new(),
959 },
960 );
961
962 let job_extra_info = HashMap::from([(
963 job_id,
964 StreamingJobExtraInfo {
965 timezone: Some("UTC".to_owned()),
966 config_override: "cfg".into(),
967 adaptive_parallelism_strategy: None,
968 job_definition: "definition".to_owned(),
969 backfill_orders: None,
970 },
971 )]);
972
973 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
974
975 let actor = stream_actors.get(&actor_id).unwrap();
976 assert_eq!(actor.actor_id, actor_id);
977 assert_eq!(actor.fragment_id, fragment_id);
978 assert_eq!(actor.mview_definition, "definition");
979 assert_eq!(&*actor.config_override, "cfg");
980 let expr_ctx = actor.expr_context.as_ref().unwrap();
981 assert_eq!(expr_ctx.time_zone, "UTC");
982 }
983}