1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::SplitImpl;
28use risingwave_hummock_sdk::change_log::TableChangeLogs;
29use risingwave_hummock_sdk::version::HummockVersion;
30use risingwave_meta_model::SinkId;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use sea_orm::TransactionTrait;
33use thiserror_ext::AsReport;
34use tracing::{info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::MetaResult;
38use crate::barrier::DatabaseRuntimeInfoSnapshot;
39use crate::barrier::context::GlobalBarrierWorkerContextImpl;
40use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
41use crate::controller::scale::{
42 FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
43 render_actor_assignments,
44};
45use crate::controller::utils::StreamingJobExtraInfo;
46use crate::manager::ActiveStreamingWorkerNodes;
47use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
48use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
49use crate::stream::cdc::reload_cdc_table_snapshot_splits;
50use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
51
52#[derive(Debug)]
53pub(crate) struct UpstreamSinkRecoveryInfo {
54 target_fragment_id: FragmentId,
55 upstream_infos: Vec<UpstreamSinkInfo>,
56}
57
58#[derive(Debug)]
59pub struct LoadedRecoveryContext {
60 pub fragment_context: LoadedFragmentContext,
61 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
62 pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
63 pub fragment_relations: FragmentDownstreamRelation,
64}
65
66impl LoadedRecoveryContext {
67 fn empty(fragment_context: LoadedFragmentContext) -> Self {
68 Self {
69 fragment_context,
70 job_extra_info: HashMap::new(),
71 upstream_sink_recovery: HashMap::new(),
72 fragment_relations: FragmentDownstreamRelation::default(),
73 }
74 }
75}
76
77pub struct RenderedDatabaseRuntimeInfo {
78 pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
79 pub stream_actors: HashMap<ActorId, StreamActor>,
80 pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
81}
82
83pub fn render_runtime_info(
84 actor_id_generator: &AtomicU32,
85 worker_nodes: &ActiveStreamingWorkerNodes,
86 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
87 recovery_context: &LoadedRecoveryContext,
88 database_id: DatabaseId,
89) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
90 let Some(per_database_context) = recovery_context.fragment_context.for_database(database_id)
91 else {
92 return Ok(None);
93 };
94
95 assert!(!per_database_context.is_empty());
96
97 let RenderedGraph { mut fragments, .. } = render_actor_assignments(
98 actor_id_generator,
99 worker_nodes.current(),
100 adaptive_parallelism_strategy,
101 &per_database_context,
102 )?;
103
104 let single_database = match fragments.remove(&database_id) {
105 Some(info) => info,
106 None => return Ok(None),
107 };
108
109 let mut database_map = HashMap::from([(database_id, single_database)]);
110 recovery_table_with_upstream_sinks(
111 &mut database_map,
112 &recovery_context.upstream_sink_recovery,
113 )?;
114 let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
115
116 let job_infos = database_map
117 .remove(&database_id)
118 .expect("database entry must exist");
119
120 let mut source_splits = HashMap::new();
121 for fragment_infos in job_infos.values() {
122 for fragment in fragment_infos.values() {
123 for (actor_id, info) in &fragment.actors {
124 source_splits.insert(*actor_id, info.splits.clone());
125 }
126 }
127 }
128
129 Ok(Some(RenderedDatabaseRuntimeInfo {
130 job_infos,
131 stream_actors,
132 source_splits,
133 }))
134}
135
136fn recovery_table_with_upstream_sinks(
141 inflight_jobs: &mut FragmentRenderMap,
142 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
143) -> MetaResult<()> {
144 if upstream_sink_recovery.is_empty() {
145 return Ok(());
146 }
147
148 let mut seen_jobs = HashSet::new();
149
150 for jobs in inflight_jobs.values_mut() {
151 for (job_id, fragments) in jobs {
152 if !seen_jobs.insert(*job_id) {
153 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
154 }
155
156 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
157 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
158 refill_upstream_sink_union_in_table(
159 &mut target_fragment.nodes,
160 &recovery.upstream_infos,
161 );
162 } else {
163 return Err(anyhow::anyhow!(
164 "target fragment {} not found for upstream sink recovery of job {}",
165 recovery.target_fragment_id,
166 job_id
167 )
168 .into());
169 }
170 }
171 }
172 }
173
174 Ok(())
175}
176
177fn build_stream_actors(
183 all_info: &FragmentRenderMap,
184 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
185) -> MetaResult<HashMap<ActorId, StreamActor>> {
186 let mut stream_actors = HashMap::new();
187
188 for (job_id, streaming_info) in all_info.values().flatten() {
189 let extra_info = job_extra_info
190 .get(job_id)
191 .cloned()
192 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
193 let expr_context = extra_info.stream_context().to_expr_context();
194 let job_definition = extra_info.job_definition;
195 let config_override = extra_info.config_override;
196
197 for (fragment_id, fragment_infos) in streaming_info {
198 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
199 stream_actors.insert(
200 *actor_id,
201 StreamActor {
202 actor_id: *actor_id,
203 fragment_id: *fragment_id,
204 vnode_bitmap: vnode_bitmap.clone(),
205 mview_definition: job_definition.clone(),
206 expr_context: Some(expr_context.clone()),
207 config_override: config_override.clone(),
208 },
209 );
210 }
211 }
212 }
213 Ok(stream_actors)
214}
215
216impl GlobalBarrierWorkerContextImpl {
217 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
219 self.metadata_manager
220 .catalog_controller
221 .clean_dirty_subscription(database_id)
222 .await?;
223 let dirty_associated_source_ids = self
224 .metadata_manager
225 .catalog_controller
226 .clean_dirty_creating_jobs(database_id)
227 .await?;
228 self.metadata_manager
229 .reset_all_refresh_jobs_to_idle()
230 .await?;
231
232 self.source_manager
234 .apply_source_change(SourceChange::DropSource {
235 dropped_source_ids: dirty_associated_source_ids,
236 })
237 .await;
238
239 Ok(())
240 }
241
242 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
243 if let Some(database_id) = database_id {
244 let sink_ids = self
245 .metadata_manager
246 .catalog_controller
247 .list_sink_ids(Some(database_id))
248 .await?;
249 self.sink_manager.stop_sink_coordinator(sink_ids).await;
250 } else {
251 self.sink_manager.reset().await;
252 }
253 Ok(())
254 }
255
256 async fn abort_dirty_pending_sink_state(
257 &self,
258 database_id: Option<DatabaseId>,
259 ) -> MetaResult<()> {
260 let pending_sinks: HashSet<SinkId> = self
261 .metadata_manager
262 .catalog_controller
263 .list_all_pending_sinks(database_id)
264 .await?;
265
266 if pending_sinks.is_empty() {
267 return Ok(());
268 }
269
270 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
271 .metadata_manager
272 .catalog_controller
273 .fetch_sink_with_state_table_ids(pending_sinks)
274 .await?;
275
276 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
277
278 for (sink_id, table_ids) in sink_with_state_tables {
279 let Some(table_id) = table_ids.first() else {
280 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
281 };
282
283 self.hummock_manager
284 .on_current_version(|version| -> MetaResult<()> {
285 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
286 assert!(
287 sink_committed_epoch
288 .insert(sink_id, committed_epoch)
289 .is_none()
290 );
291 Ok(())
292 } else {
293 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
294 }
295 })
296 .await?;
297 }
298
299 self.metadata_manager
300 .catalog_controller
301 .abort_pending_sink_epochs(sink_committed_epoch)
302 .await?;
303
304 Ok(())
305 }
306
307 async fn purge_state_table_from_hummock(
308 &self,
309 all_state_table_ids: &HashSet<TableId>,
310 ) -> MetaResult<()> {
311 self.hummock_manager.purge(all_state_table_ids).await?;
312 Ok(())
313 }
314
315 async fn list_background_job_progress(
316 &self,
317 database_id: Option<DatabaseId>,
318 ) -> MetaResult<HashSet<JobId>> {
319 let mgr = &self.metadata_manager;
320 mgr.catalog_controller
321 .list_background_creating_jobs(false, database_id)
322 .await
323 }
324
325 async fn load_recovery_context(
326 &self,
327 database_id: Option<DatabaseId>,
328 ) -> MetaResult<LoadedRecoveryContext> {
329 let inner = self
330 .metadata_manager
331 .catalog_controller
332 .get_inner_read_guard()
333 .await;
334 let txn = inner.db.begin().await?;
335
336 let fragment_context = self
337 .metadata_manager
338 .catalog_controller
339 .load_fragment_context_in_txn(&txn, database_id)
340 .await
341 .inspect_err(|err| {
342 warn!(error = %err.as_report(), "load fragment context failed");
343 })?;
344
345 if fragment_context.is_empty() {
346 return Ok(LoadedRecoveryContext::empty(fragment_context));
347 }
348
349 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
350 let job_extra_info = self
351 .metadata_manager
352 .catalog_controller
353 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
354 .await?;
355
356 let mut upstream_targets = HashMap::new();
357 for fragment in fragment_context
358 .job_fragments
359 .values()
360 .flat_map(|fragments| fragments.values())
361 {
362 let mut has_upstream_union = false;
363 visit_stream_node_cont(&fragment.nodes, |node| {
364 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
365 has_upstream_union = true;
366 false
367 } else {
368 true
369 }
370 });
371
372 if has_upstream_union
373 && let Some(previous) =
374 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
375 {
376 bail!(
377 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
378 fragment.job_id,
379 fragment.fragment_id,
380 previous
381 );
382 }
383 }
384
385 let mut upstream_sink_recovery = HashMap::new();
386 if !upstream_targets.is_empty() {
387 let tables = self
388 .metadata_manager
389 .catalog_controller
390 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
391 .await?;
392
393 for table in tables {
394 let job_id = table.id.as_job_id();
395 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
396 tracing::debug!(
398 job_id = %job_id,
399 "upstream sink union target fragment not found for table"
400 );
401 continue;
402 };
403
404 let upstream_infos = self
405 .metadata_manager
406 .catalog_controller
407 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
408 .await?;
409
410 upstream_sink_recovery.insert(
411 job_id,
412 UpstreamSinkRecoveryInfo {
413 target_fragment_id: *target_fragment_id,
414 upstream_infos,
415 },
416 );
417 }
418 }
419
420 let fragment_relations = self
421 .metadata_manager
422 .catalog_controller
423 .get_fragment_downstream_relations_in_txn(
424 &txn,
425 fragment_context
426 .job_fragments
427 .values()
428 .flat_map(|fragments| fragments.keys().copied())
429 .collect_vec(),
430 )
431 .await?;
432
433 Ok(LoadedRecoveryContext {
434 fragment_context,
435 job_extra_info,
436 upstream_sink_recovery,
437 fragment_relations,
438 })
439 }
440
441 #[expect(clippy::type_complexity)]
442 fn resolve_hummock_version_epochs(
443 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
444 version: &HummockVersion,
445 table_change_log: &TableChangeLogs,
446 ) -> MetaResult<(
447 HashMap<TableId, u64>,
448 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
449 )> {
450 let table_committed_epoch: HashMap<_, _> = version
451 .state_table_info
452 .info()
453 .iter()
454 .map(|(table_id, info)| (*table_id, info.committed_epoch))
455 .collect();
456 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
457 Ok(*table_committed_epoch
458 .get(&table_id)
459 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
460 };
461 let mut min_downstream_committed_epochs = HashMap::new();
462 for (job_id, fragments) in background_jobs {
463 let job_committed_epoch = {
464 let mut table_id_iter = fragments
465 .values()
466 .flat_map(|fragment| fragment.state_table_ids.iter().copied());
467 let Some(first_table_id) = table_id_iter.next() else {
468 bail!("job {} has no state table", job_id);
469 };
470 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
471 for table_id in table_id_iter {
472 let table_committed_epoch = get_table_committed_epoch(table_id)?;
473 if job_committed_epoch != table_committed_epoch {
474 bail!(
475 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
476 first_table_id,
477 job_committed_epoch,
478 table_id,
479 table_committed_epoch,
480 job_id
481 );
482 }
483 }
484
485 job_committed_epoch
486 };
487 if let (Some(snapshot_backfill_info), _) =
488 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
489 fragments
490 .values()
491 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
492 )?
493 {
494 for (upstream_table, snapshot_epoch) in
495 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
496 {
497 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
498 anyhow!(
499 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
500 job_id, upstream_table
501 )
502 })?;
503 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
504 match min_downstream_committed_epochs.entry(upstream_table) {
505 Entry::Occupied(entry) => {
506 let prev_min_epoch = entry.into_mut();
507 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
508 }
509 Entry::Vacant(entry) => {
510 entry.insert(pinned_epoch);
511 }
512 }
513 }
514 }
515 }
516 let mut log_epochs = HashMap::new();
517 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
518 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
519 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
520 Ordering::Less => {
521 bail!(
522 "downstream epoch {} later than upstream epoch {} of table {}",
523 downstream_committed_epoch,
524 upstream_committed_epoch,
525 upstream_table_id
526 );
527 }
528 Ordering::Equal => {
529 continue;
530 }
531 Ordering::Greater => {
532 if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
533 let epochs = table_change_log
534 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
535 .map(|epoch_log| {
536 (
537 epoch_log.non_checkpoint_epochs.clone(),
538 epoch_log.checkpoint_epoch,
539 )
540 })
541 .collect_vec();
542 let first_epochs = epochs.first();
543 if let Some((_, first_checkpoint_epoch)) = &first_epochs
544 && *first_checkpoint_epoch == downstream_committed_epoch
545 {
546 } else {
547 bail!(
548 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
549 epochs,
550 upstream_table_id,
551 downstream_committed_epoch
552 );
553 }
554 log_epochs
555 .try_insert(upstream_table_id, epochs)
556 .expect("non-duplicated");
557 } else {
558 bail!(
559 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
560 upstream_table_id,
561 upstream_committed_epoch,
562 downstream_committed_epoch
563 );
564 }
565 }
566 }
567 }
568 Ok((table_committed_epoch, log_epochs))
569 }
570
571 pub(super) async fn reload_runtime_info_impl(
572 &self,
573 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
574 {
575 {
576 {
577 self.clean_dirty_streaming_jobs(None)
578 .await
579 .context("clean dirty streaming jobs")?;
580
581 self.reset_sink_coordinator(None)
582 .await
583 .context("reset sink coordinator")?;
584 self.abort_dirty_pending_sink_state(None)
585 .await
586 .context("abort dirty pending sink state")?;
587
588 tracing::info!("recovering background job progress");
590 let initial_background_jobs = self
591 .list_background_job_progress(None)
592 .await
593 .context("recover background job progress should not fail")?;
594
595 tracing::info!("recovered background job progress");
596
597 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
599 self.metadata_manager
600 .catalog_controller
601 .cleanup_dropped_tables()
602 .await;
603
604 let active_streaming_nodes =
605 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
606 .await?;
607
608 let background_streaming_jobs =
609 initial_background_jobs.iter().cloned().collect_vec();
610
611 tracing::info!(
612 "background streaming jobs: {:?} total {}",
613 background_streaming_jobs,
614 background_streaming_jobs.len()
615 );
616
617 let unreschedulable_jobs = {
618 let mut unreschedulable_jobs = HashSet::new();
619
620 for job_id in background_streaming_jobs {
621 let scan_types = self
622 .metadata_manager
623 .get_job_backfill_scan_types(job_id)
624 .await?;
625
626 if scan_types
627 .values()
628 .any(|scan_type| !scan_type.is_reschedulable(false))
629 {
630 unreschedulable_jobs.insert(job_id);
631 }
632 }
633
634 unreschedulable_jobs
635 };
636
637 if !unreschedulable_jobs.is_empty() {
638 info!(
639 "unreschedulable background jobs: {:?}",
640 unreschedulable_jobs
641 );
642 }
643
644 if !unreschedulable_jobs.is_empty() {
648 bail!(
649 "Recovery for unreschedulable background jobs is not yet implemented. \
650 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
651 unreschedulable_jobs
652 );
653 }
654
655 let mut recovery_context = self.load_recovery_context(None).await?;
656 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
657 if !dropped_table_ids.is_empty() {
658 self.metadata_manager
659 .catalog_controller
660 .complete_dropped_tables(dropped_table_ids)
661 .await;
662 recovery_context = self.load_recovery_context(None).await?;
663 }
664
665 self.purge_state_table_from_hummock(
666 &recovery_context
667 .fragment_context
668 .job_fragments
669 .values()
670 .flat_map(|fragments| fragments.values())
671 .flat_map(|fragment| fragment.state_table_ids.iter().copied())
672 .collect(),
673 )
674 .await
675 .context("purge state table from hummock")?;
676
677 let (state_table_committed_epochs, state_table_log_epochs) = self
678 .hummock_manager
679 .on_current_version_and_table_change_log(|version, table_change_log| {
680 Self::resolve_hummock_version_epochs(
681 recovery_context
682 .fragment_context
683 .job_fragments
684 .iter()
685 .filter_map(|(job_id, job)| {
686 initial_background_jobs
687 .contains(job_id)
688 .then_some((*job_id, job))
689 }),
690 version,
691 table_change_log,
692 )
693 })
694 .await?;
695
696 let mv_depended_subscriptions = self
697 .metadata_manager
698 .get_mv_depended_subscriptions(None)
699 .await?;
700
701 let background_jobs = {
703 let mut refreshed_background_jobs = self
704 .list_background_job_progress(None)
705 .await
706 .context("recover background job progress should not fail")?;
707 recovery_context
708 .fragment_context
709 .job_map
710 .keys()
711 .filter_map(|job_id| {
712 refreshed_background_jobs.remove(job_id).then_some(*job_id)
713 })
714 .collect()
715 };
716
717 let database_infos = self
718 .metadata_manager
719 .catalog_controller
720 .list_databases()
721 .await?;
722
723 let cdc_table_snapshot_splits =
724 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
725 .await?;
726
727 Ok(BarrierWorkerRuntimeInfoSnapshot {
728 active_streaming_nodes,
729 recovery_context,
730 state_table_committed_epochs,
731 state_table_log_epochs,
732 mv_depended_subscriptions,
733 background_jobs,
734 hummock_version_stats: self.hummock_manager.get_version_stats().await,
735 database_infos,
736 cdc_table_snapshot_splits,
737 })
738 }
739 }
740 }
741 }
742
743 pub(super) async fn reload_database_runtime_info_impl(
744 &self,
745 database_id: DatabaseId,
746 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
747 self.clean_dirty_streaming_jobs(Some(database_id))
748 .await
749 .context("clean dirty streaming jobs")?;
750
751 self.reset_sink_coordinator(Some(database_id))
752 .await
753 .context("reset sink coordinator")?;
754 self.abort_dirty_pending_sink_state(Some(database_id))
755 .await
756 .context("abort dirty pending sink state")?;
757
758 tracing::info!(
760 ?database_id,
761 "recovering background job progress of database"
762 );
763
764 let background_jobs = self
765 .list_background_job_progress(Some(database_id))
766 .await
767 .context("recover background job progress of database should not fail")?;
768 tracing::info!(?database_id, "recovered background job progress");
769
770 let dropped_table_ids = self
772 .scheduled_barriers
773 .pre_apply_drop_cancel(Some(database_id));
774 self.metadata_manager
775 .catalog_controller
776 .complete_dropped_tables(dropped_table_ids)
777 .await;
778
779 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
780
781 let missing_background_jobs = background_jobs
782 .iter()
783 .filter(|job_id| {
784 !recovery_context
785 .fragment_context
786 .job_map
787 .contains_key(*job_id)
788 })
789 .copied()
790 .collect_vec();
791 if !missing_background_jobs.is_empty() {
792 warn!(
793 database_id = %database_id,
794 missing_job_ids = ?missing_background_jobs,
795 "background jobs missing in rendered info"
796 );
797 }
798
799 let (state_table_committed_epochs, state_table_log_epochs) = self
800 .hummock_manager
801 .on_current_version_and_table_change_log(|version, table_change_log| {
802 Self::resolve_hummock_version_epochs(
803 background_jobs.iter().filter_map(|job_id| {
804 recovery_context
805 .fragment_context
806 .job_fragments
807 .get(job_id)
808 .map(|job| (*job_id, job))
809 }),
810 version,
811 table_change_log,
812 )
813 })
814 .await?;
815
816 let mv_depended_subscriptions = self
817 .metadata_manager
818 .get_mv_depended_subscriptions(Some(database_id))
819 .await?;
820
821 let cdc_table_snapshot_splits =
822 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
823 .await?;
824
825 self.refresh_manager
826 .remove_trackers_by_database(database_id);
827
828 Ok(DatabaseRuntimeInfoSnapshot {
829 recovery_context,
830 state_table_committed_epochs,
831 state_table_log_epochs,
832 mv_depended_subscriptions,
833 background_jobs,
834 cdc_table_snapshot_splits,
835 })
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use std::collections::HashMap;
842
843 use risingwave_common::catalog::FragmentTypeMask;
844 use risingwave_common::id::WorkerId;
845 use risingwave_meta_model::DispatcherType;
846 use risingwave_meta_model::fragment::DistributionType;
847 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
848 use risingwave_pb::stream_plan::{
849 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
850 };
851
852 use super::*;
853 use crate::controller::fragment::InflightActorInfo;
854 use crate::model::DownstreamFragmentRelation;
855 use crate::stream::UpstreamSinkInfo;
856
857 #[test]
858 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
859 let database_id = DatabaseId::new(1);
860 let job_id = JobId::new(10);
861 let fragment_id = FragmentId::new(100);
862 let sink_fragment_id = FragmentId::new(200);
863
864 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
865 let fragment = InflightFragmentInfo {
866 fragment_id,
867 distribution_type: DistributionType::Hash,
868 fragment_type_mask: FragmentTypeMask::empty(),
869 vnode_count: 1,
870 nodes: PbStreamNode {
871 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
872 PbUpstreamSinkUnionNode {
873 init_upstreams: vec![],
874 },
875 ))),
876 ..Default::default()
877 },
878 actors: HashMap::new(),
879 state_table_ids: HashSet::new(),
880 };
881
882 inflight_jobs
883 .entry(database_id)
884 .or_default()
885 .entry(job_id)
886 .or_default()
887 .insert(fragment_id, fragment);
888
889 let upstream_sink_recovery = HashMap::from([(
890 job_id,
891 UpstreamSinkRecoveryInfo {
892 target_fragment_id: fragment_id,
893 upstream_infos: vec![UpstreamSinkInfo {
894 sink_id: SinkId::new(1),
895 sink_fragment_id,
896 sink_output_fields: vec![],
897 sink_original_target_columns: vec![],
898 project_exprs: vec![],
899 new_sink_downstream: DownstreamFragmentRelation {
900 downstream_fragment_id: FragmentId::new(300),
901 dispatcher_type: DispatcherType::Hash,
902 dist_key_indices: vec![],
903 output_mapping: PbDispatchOutputMapping::default(),
904 },
905 }],
906 },
907 )]);
908
909 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
910
911 let updated = inflight_jobs
912 .get(&database_id)
913 .unwrap()
914 .get(&job_id)
915 .unwrap()
916 .get(&fragment_id)
917 .unwrap();
918
919 let PbNodeBody::UpstreamSinkUnion(updated_union) =
920 updated.nodes.node_body.as_ref().unwrap()
921 else {
922 panic!("expected upstream sink union node");
923 };
924
925 assert_eq!(updated_union.init_upstreams.len(), 1);
926 assert_eq!(
927 updated_union.init_upstreams[0].upstream_fragment_id,
928 sink_fragment_id.as_raw_id()
929 );
930 }
931
932 #[test]
933 fn test_build_stream_actors_uses_preloaded_extra_info() {
934 let database_id = DatabaseId::new(2);
935 let job_id = JobId::new(20);
936 let fragment_id = FragmentId::new(120);
937 let actor_id = ActorId::new(500);
938
939 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
940 inflight_jobs
941 .entry(database_id)
942 .or_default()
943 .entry(job_id)
944 .or_default()
945 .insert(
946 fragment_id,
947 InflightFragmentInfo {
948 fragment_id,
949 distribution_type: DistributionType::Hash,
950 fragment_type_mask: FragmentTypeMask::empty(),
951 vnode_count: 1,
952 nodes: PbStreamNode::default(),
953 actors: HashMap::from([(
954 actor_id,
955 InflightActorInfo {
956 worker_id: WorkerId::new(1),
957 vnode_bitmap: None,
958 splits: vec![],
959 },
960 )]),
961 state_table_ids: HashSet::new(),
962 },
963 );
964
965 let job_extra_info = HashMap::from([(
966 job_id,
967 StreamingJobExtraInfo {
968 timezone: Some("UTC".to_owned()),
969 config_override: "cfg".into(),
970 adaptive_parallelism_strategy: None,
971 job_definition: "definition".to_owned(),
972 backfill_orders: None,
973 },
974 )]);
975
976 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
977
978 let actor = stream_actors.get(&actor_id).unwrap();
979 assert_eq!(actor.actor_id, actor_id);
980 assert_eq!(actor.fragment_id, fragment_id);
981 assert_eq!(actor.mview_definition, "definition");
982 assert_eq!(&*actor.config_override, "cfg");
983 let expr_ctx = actor.expr_context.as_ref().unwrap();
984 assert_eq!(expr_ctx.time_zone, "UTC");
985 }
986}