1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::TableChangeLogs;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
42 render_actor_assignments,
43};
44use crate::controller::utils::StreamingJobExtraInfo;
45use crate::manager::ActiveStreamingWorkerNodes;
46use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
47use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
48use crate::stream::cdc::reload_cdc_table_snapshot_splits;
49use crate::stream::{
50 SourceChange, StreamFragmentGraph, UpstreamSinkInfo, cleanup_dropped_streaming_jobs,
51};
52
53#[derive(Debug)]
54pub(crate) struct UpstreamSinkRecoveryInfo {
55 target_fragment_id: FragmentId,
56 upstream_infos: Vec<UpstreamSinkInfo>,
57}
58
59#[derive(Debug)]
60pub struct LoadedRecoveryContext {
61 pub fragment_context: LoadedFragmentContext,
62 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
63 pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
64 pub fragment_relations: FragmentDownstreamRelation,
65}
66
67impl LoadedRecoveryContext {
68 fn empty(fragment_context: LoadedFragmentContext) -> Self {
69 Self {
70 fragment_context,
71 job_extra_info: HashMap::new(),
72 upstream_sink_recovery: HashMap::new(),
73 fragment_relations: FragmentDownstreamRelation::default(),
74 }
75 }
76}
77
78pub struct RenderedDatabaseRuntimeInfo {
79 pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
80 pub stream_actors: HashMap<ActorId, StreamActor>,
81 pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
82}
83
84pub fn render_runtime_info(
85 actor_id_generator: &AtomicU32,
86 worker_nodes: &ActiveStreamingWorkerNodes,
87 recovery_context: &LoadedRecoveryContext,
88 database_id: DatabaseId,
89) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
90 let Some(per_database_context) = recovery_context.fragment_context.for_database(database_id)
91 else {
92 return Ok(None);
93 };
94
95 assert!(!per_database_context.is_empty());
96
97 let RenderedGraph { mut fragments, .. } = render_actor_assignments(
98 actor_id_generator,
99 worker_nodes.current(),
100 &per_database_context,
101 )?;
102
103 let single_database = match fragments.remove(&database_id) {
104 Some(info) => info,
105 None => return Ok(None),
106 };
107
108 let mut database_map = HashMap::from([(database_id, single_database)]);
109 recovery_table_with_upstream_sinks(
110 &mut database_map,
111 &recovery_context.upstream_sink_recovery,
112 )?;
113 let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
114
115 let job_infos = database_map
116 .remove(&database_id)
117 .expect("database entry must exist");
118
119 let mut source_splits = HashMap::new();
120 for fragment_infos in job_infos.values() {
121 for fragment in fragment_infos.values() {
122 for (actor_id, info) in &fragment.actors {
123 source_splits.insert(*actor_id, info.splits.clone());
124 }
125 }
126 }
127
128 Ok(Some(RenderedDatabaseRuntimeInfo {
129 job_infos,
130 stream_actors,
131 source_splits,
132 }))
133}
134
135fn recovery_table_with_upstream_sinks(
140 inflight_jobs: &mut FragmentRenderMap,
141 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
142) -> MetaResult<()> {
143 if upstream_sink_recovery.is_empty() {
144 return Ok(());
145 }
146
147 let mut seen_jobs = HashSet::new();
148
149 for jobs in inflight_jobs.values_mut() {
150 for (job_id, fragments) in jobs {
151 if !seen_jobs.insert(*job_id) {
152 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
153 }
154
155 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
156 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
157 refill_upstream_sink_union_in_table(
158 &mut target_fragment.nodes,
159 &recovery.upstream_infos,
160 );
161 } else {
162 return Err(anyhow::anyhow!(
163 "target fragment {} not found for upstream sink recovery of job {}",
164 recovery.target_fragment_id,
165 job_id
166 )
167 .into());
168 }
169 }
170 }
171 }
172
173 Ok(())
174}
175
176fn build_stream_actors(
182 all_info: &FragmentRenderMap,
183 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
184) -> MetaResult<HashMap<ActorId, StreamActor>> {
185 let mut stream_actors = HashMap::new();
186
187 for (job_id, streaming_info) in all_info.values().flatten() {
188 let extra_info = job_extra_info
189 .get(job_id)
190 .cloned()
191 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
192 let expr_context = extra_info.stream_context().to_expr_context();
193 let job_definition = extra_info.job_definition;
194 let config_override = extra_info.config_override;
195
196 for (fragment_id, fragment_infos) in streaming_info {
197 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
198 stream_actors.insert(
199 *actor_id,
200 StreamActor {
201 actor_id: *actor_id,
202 fragment_id: *fragment_id,
203 vnode_bitmap: vnode_bitmap.clone(),
204 mview_definition: job_definition.clone(),
205 expr_context: Some(expr_context.clone()),
206 config_override: config_override.clone(),
207 },
208 );
209 }
210 }
211 }
212 Ok(stream_actors)
213}
214
215impl GlobalBarrierWorkerContextImpl {
216 async fn apply_pre_applied_drop_cancel(
217 &self,
218 database_id: Option<DatabaseId>,
219 ) -> MetaResult<bool> {
220 let drop_cancel = self.scheduled_barriers.pre_apply_drop_cancel(database_id);
221 let has_drop_streaming_jobs = !drop_cancel.streaming_job_ids.is_empty();
222 cleanup_dropped_streaming_jobs(
223 &self.refresh_manager,
224 &self.hummock_manager,
225 &self.metadata_manager,
226 drop_cancel.streaming_job_ids,
227 drop_cancel.dropped_state_table_ids,
228 "drop_streaming_jobs",
229 )
230 .await?;
231 Ok(has_drop_streaming_jobs)
232 }
233
234 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
236 self.metadata_manager
237 .catalog_controller
238 .clean_dirty_subscription(database_id)
239 .await?;
240
241 let cleaned_dirty_jobs = self
242 .metadata_manager
243 .catalog_controller
244 .clean_dirty_creating_jobs(database_id)
245 .await?;
246 if database_id.is_some() {
247 cleanup_dropped_streaming_jobs(
250 &self.refresh_manager,
251 &self.hummock_manager,
252 &self.metadata_manager,
253 cleaned_dirty_jobs.streaming_job_ids,
254 cleaned_dirty_jobs.dropped_table_ids,
255 "clean_dirty_creating_jobs",
256 )
257 .await?;
258 }
259 self.metadata_manager
260 .reset_all_refresh_jobs_to_idle()
261 .await?;
262
263 self.source_manager
265 .apply_source_change(SourceChange::DropSource {
266 dropped_source_ids: cleaned_dirty_jobs.source_ids,
267 })
268 .await;
269
270 Ok(())
271 }
272
273 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
274 if let Some(database_id) = database_id {
275 let sink_ids = self
276 .metadata_manager
277 .catalog_controller
278 .list_sink_ids(Some(database_id))
279 .await?;
280 self.sink_manager.stop_sink_coordinator(sink_ids).await;
281 } else {
282 self.sink_manager.reset().await;
283 }
284 Ok(())
285 }
286
287 async fn abort_dirty_pending_sink_state(
288 &self,
289 database_id: Option<DatabaseId>,
290 ) -> MetaResult<()> {
291 let pending_sinks: HashSet<SinkId> = self
292 .metadata_manager
293 .catalog_controller
294 .list_all_pending_sinks(database_id)
295 .await?;
296
297 if pending_sinks.is_empty() {
298 return Ok(());
299 }
300
301 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
302 .metadata_manager
303 .catalog_controller
304 .fetch_sink_with_state_table_ids(pending_sinks)
305 .await?;
306
307 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
308
309 for (sink_id, table_ids) in sink_with_state_tables {
310 let Some(table_id) = table_ids.first() else {
311 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
312 };
313
314 self.hummock_manager
315 .on_current_version(|version| -> MetaResult<()> {
316 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
317 assert!(
318 sink_committed_epoch
319 .insert(sink_id, committed_epoch)
320 .is_none()
321 );
322 Ok(())
323 } else {
324 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
325 }
326 })
327 .await?;
328 }
329
330 self.metadata_manager
331 .catalog_controller
332 .abort_pending_sink_epochs(sink_committed_epoch)
333 .await?;
334
335 Ok(())
336 }
337
338 async fn purge_state_table_from_hummock(
339 &self,
340 all_state_table_ids: &HashSet<TableId>,
341 ) -> MetaResult<()> {
342 self.hummock_manager.purge(all_state_table_ids).await?;
343 Ok(())
344 }
345
346 async fn list_background_job_progress(
347 &self,
348 database_id: Option<DatabaseId>,
349 ) -> MetaResult<HashSet<JobId>> {
350 let mgr = &self.metadata_manager;
351 mgr.catalog_controller
352 .list_background_creating_jobs(false, database_id)
353 .await
354 }
355
356 async fn load_recovery_context(
357 &self,
358 database_id: Option<DatabaseId>,
359 ) -> MetaResult<LoadedRecoveryContext> {
360 let inner = self
361 .metadata_manager
362 .catalog_controller
363 .get_inner_read_guard()
364 .await;
365 let txn = inner.db.begin().await?;
366
367 let fragment_context = self
368 .metadata_manager
369 .catalog_controller
370 .load_fragment_context_in_txn(&txn, database_id)
371 .await
372 .inspect_err(|err| {
373 warn!(error = %err.as_report(), "load fragment context failed");
374 })?;
375
376 if fragment_context.is_empty() {
377 return Ok(LoadedRecoveryContext::empty(fragment_context));
378 }
379
380 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
381 let job_extra_info = self
382 .metadata_manager
383 .catalog_controller
384 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
385 .await?;
386
387 let mut upstream_targets = HashMap::new();
388 for fragment in fragment_context
389 .job_fragments
390 .values()
391 .flat_map(|fragments| fragments.values())
392 {
393 let mut has_upstream_union = false;
394 visit_stream_node_cont(&fragment.nodes, |node| {
395 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
396 has_upstream_union = true;
397 false
398 } else {
399 true
400 }
401 });
402
403 if has_upstream_union
404 && let Some(previous) =
405 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
406 {
407 bail!(
408 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
409 fragment.job_id,
410 fragment.fragment_id,
411 previous
412 );
413 }
414 }
415
416 let mut upstream_sink_recovery = HashMap::new();
417 if !upstream_targets.is_empty() {
418 let tables = self
419 .metadata_manager
420 .catalog_controller
421 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
422 .await?;
423
424 for table in tables {
425 let job_id = table.id.as_job_id();
426 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
427 tracing::debug!(
429 job_id = %job_id,
430 "upstream sink union target fragment not found for table"
431 );
432 continue;
433 };
434
435 let upstream_infos = self
436 .metadata_manager
437 .catalog_controller
438 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
439 .await?;
440
441 upstream_sink_recovery.insert(
442 job_id,
443 UpstreamSinkRecoveryInfo {
444 target_fragment_id: *target_fragment_id,
445 upstream_infos,
446 },
447 );
448 }
449 }
450
451 let fragment_relations = self
452 .metadata_manager
453 .catalog_controller
454 .get_fragment_downstream_relations_in_txn(
455 &txn,
456 fragment_context
457 .job_fragments
458 .values()
459 .flat_map(|fragments| fragments.keys().copied())
460 .collect_vec(),
461 )
462 .await?;
463
464 Ok(LoadedRecoveryContext {
465 fragment_context,
466 job_extra_info,
467 upstream_sink_recovery,
468 fragment_relations,
469 })
470 }
471
472 #[expect(clippy::type_complexity)]
473 fn resolve_hummock_version_epochs(
474 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
475 version: &HummockVersion,
476 table_change_log: &TableChangeLogs,
477 ) -> MetaResult<(
478 HashMap<TableId, u64>,
479 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
480 )> {
481 let table_committed_epoch: HashMap<_, _> = version
482 .state_table_info
483 .info()
484 .iter()
485 .map(|(table_id, info)| (*table_id, info.committed_epoch))
486 .collect();
487 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
488 Ok(*table_committed_epoch
489 .get(&table_id)
490 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
491 };
492 let mut min_downstream_committed_epochs = HashMap::new();
493 for (job_id, fragments) in background_jobs {
494 let job_committed_epoch = {
495 let mut table_id_iter = fragments
496 .values()
497 .flat_map(|fragment| fragment.state_table_ids.iter().copied());
498 let Some(first_table_id) = table_id_iter.next() else {
499 bail!("job {} has no state table", job_id);
500 };
501 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
502 for table_id in table_id_iter {
503 let table_committed_epoch = get_table_committed_epoch(table_id)?;
504 if job_committed_epoch != table_committed_epoch {
505 bail!(
506 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
507 first_table_id,
508 job_committed_epoch,
509 table_id,
510 table_committed_epoch,
511 job_id
512 );
513 }
514 }
515
516 job_committed_epoch
517 };
518 if let (Some(snapshot_backfill_info), _) =
519 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
520 fragments
521 .values()
522 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
523 )?
524 {
525 for (upstream_table, snapshot_epoch) in
526 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
527 {
528 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
529 anyhow!(
530 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
531 job_id, upstream_table
532 )
533 })?;
534 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
535 match min_downstream_committed_epochs.entry(upstream_table) {
536 Entry::Occupied(entry) => {
537 let prev_min_epoch = entry.into_mut();
538 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
539 }
540 Entry::Vacant(entry) => {
541 entry.insert(pinned_epoch);
542 }
543 }
544 }
545 }
546 }
547 let mut log_epochs = HashMap::new();
548 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
549 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
550 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
551 Ordering::Less => {
552 bail!(
553 "downstream epoch {} later than upstream epoch {} of table {}",
554 downstream_committed_epoch,
555 upstream_committed_epoch,
556 upstream_table_id
557 );
558 }
559 Ordering::Equal => {
560 continue;
561 }
562 Ordering::Greater => {
563 if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
564 let epochs = table_change_log
565 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
566 .map(|epoch_log| {
567 (
568 epoch_log.non_checkpoint_epochs.clone(),
569 epoch_log.checkpoint_epoch,
570 )
571 })
572 .collect_vec();
573 let first_epochs = epochs.first();
574 if let Some((_, first_checkpoint_epoch)) = &first_epochs
575 && *first_checkpoint_epoch == downstream_committed_epoch
576 {
577 } else {
578 bail!(
579 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
580 epochs,
581 upstream_table_id,
582 downstream_committed_epoch
583 );
584 }
585 log_epochs
586 .try_insert(upstream_table_id, epochs)
587 .expect("non-duplicated");
588 } else {
589 bail!(
590 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
591 upstream_table_id,
592 upstream_committed_epoch,
593 downstream_committed_epoch
594 );
595 }
596 }
597 }
598 }
599 Ok((table_committed_epoch, log_epochs))
600 }
601
602 pub(super) async fn reload_runtime_info_impl(
603 &self,
604 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
605 {
606 {
607 {
608 self.clean_dirty_streaming_jobs(None)
609 .await
610 .context("clean dirty streaming jobs")?;
611
612 self.reset_sink_coordinator(None)
613 .await
614 .context("reset sink coordinator")?;
615 self.abort_dirty_pending_sink_state(None)
616 .await
617 .context("abort dirty pending sink state")?;
618
619 tracing::info!("recovering background job progress");
621 let initial_background_jobs = self
622 .list_background_job_progress(None)
623 .await
624 .context("recover background job progress should not fail")?;
625
626 tracing::info!("recovered background job progress");
627
628 let _ = self.apply_pre_applied_drop_cancel(None).await?;
630 self.metadata_manager
631 .catalog_controller
632 .cleanup_dropped_tables()
633 .await;
634
635 let active_streaming_nodes =
636 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
637 .await?;
638
639 let background_streaming_jobs =
640 initial_background_jobs.iter().cloned().collect_vec();
641
642 tracing::info!(
643 "background streaming jobs: {:?} total {}",
644 background_streaming_jobs,
645 background_streaming_jobs.len()
646 );
647
648 let unreschedulable_jobs = {
649 let mut unreschedulable_jobs = HashSet::new();
650
651 for job_id in background_streaming_jobs {
652 let scan_types = self
653 .metadata_manager
654 .get_job_backfill_scan_types(job_id)
655 .await?;
656
657 if scan_types
658 .values()
659 .any(|scan_type| !scan_type.is_reschedulable(false))
660 {
661 unreschedulable_jobs.insert(job_id);
662 }
663 }
664
665 unreschedulable_jobs
666 };
667
668 if !unreschedulable_jobs.is_empty() {
669 info!(
670 "unreschedulable background jobs: {:?}",
671 unreschedulable_jobs
672 );
673 }
674
675 if !unreschedulable_jobs.is_empty() {
679 bail!(
680 "Recovery for unreschedulable background jobs is not yet implemented. \
681 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
682 unreschedulable_jobs
683 );
684 }
685
686 let mut recovery_context = self.load_recovery_context(None).await?;
687 if self.apply_pre_applied_drop_cancel(None).await? {
688 recovery_context = self.load_recovery_context(None).await?;
689 }
690
691 self.purge_state_table_from_hummock(
692 &recovery_context
693 .fragment_context
694 .job_fragments
695 .values()
696 .flat_map(|fragments| fragments.values())
697 .flat_map(|fragment| fragment.state_table_ids.iter().copied())
698 .collect(),
699 )
700 .await
701 .context("purge state table from hummock")?;
702
703 let (state_table_committed_epochs, state_table_log_epochs) = self
704 .hummock_manager
705 .on_current_version_and_table_change_log(|version, table_change_log| {
706 Self::resolve_hummock_version_epochs(
707 recovery_context
708 .fragment_context
709 .job_fragments
710 .iter()
711 .filter_map(|(job_id, job)| {
712 initial_background_jobs
713 .contains(job_id)
714 .then_some((*job_id, job))
715 }),
716 version,
717 table_change_log,
718 )
719 })
720 .await?;
721
722 let mv_depended_subscriptions = self
723 .metadata_manager
724 .get_mv_depended_subscriptions(None)
725 .await?;
726
727 let background_jobs = {
729 let mut refreshed_background_jobs = self
730 .list_background_job_progress(None)
731 .await
732 .context("recover background job progress should not fail")?;
733 recovery_context
734 .fragment_context
735 .job_map
736 .keys()
737 .filter_map(|job_id| {
738 refreshed_background_jobs.remove(job_id).then_some(*job_id)
739 })
740 .collect()
741 };
742
743 let database_infos = self
744 .metadata_manager
745 .catalog_controller
746 .list_databases()
747 .await?;
748
749 let cdc_table_snapshot_splits =
750 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
751 .await?;
752
753 Ok(BarrierWorkerRuntimeInfoSnapshot {
754 active_streaming_nodes,
755 recovery_context,
756 state_table_committed_epochs,
757 state_table_log_epochs,
758 mv_depended_subscriptions,
759 background_jobs,
760 hummock_version_stats: self.hummock_manager.get_version_stats().await,
761 database_infos,
762 cdc_table_snapshot_splits,
763 })
764 }
765 }
766 }
767 }
768
769 pub(super) async fn reload_database_runtime_info_impl(
770 &self,
771 database_id: DatabaseId,
772 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
773 self.clean_dirty_streaming_jobs(Some(database_id))
774 .await
775 .context("clean dirty streaming jobs")?;
776
777 self.reset_sink_coordinator(Some(database_id))
778 .await
779 .context("reset sink coordinator")?;
780 self.abort_dirty_pending_sink_state(Some(database_id))
781 .await
782 .context("abort dirty pending sink state")?;
783
784 tracing::info!(
786 ?database_id,
787 "recovering background job progress of database"
788 );
789
790 let background_jobs = self
791 .list_background_job_progress(Some(database_id))
792 .await
793 .context("recover background job progress of database should not fail")?;
794 tracing::info!(?database_id, "recovered background job progress");
795
796 let _ = self
798 .apply_pre_applied_drop_cancel(Some(database_id))
799 .await?;
800
801 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
802
803 let missing_background_jobs = background_jobs
804 .iter()
805 .filter(|job_id| {
806 !recovery_context
807 .fragment_context
808 .job_map
809 .contains_key(*job_id)
810 })
811 .copied()
812 .collect_vec();
813 if !missing_background_jobs.is_empty() {
814 warn!(
815 database_id = %database_id,
816 missing_job_ids = ?missing_background_jobs,
817 "background jobs missing in rendered info"
818 );
819 }
820
821 let (state_table_committed_epochs, state_table_log_epochs) = self
822 .hummock_manager
823 .on_current_version_and_table_change_log(|version, table_change_log| {
824 Self::resolve_hummock_version_epochs(
825 background_jobs.iter().filter_map(|job_id| {
826 recovery_context
827 .fragment_context
828 .job_fragments
829 .get(job_id)
830 .map(|job| (*job_id, job))
831 }),
832 version,
833 table_change_log,
834 )
835 })
836 .await?;
837
838 let mv_depended_subscriptions = self
839 .metadata_manager
840 .get_mv_depended_subscriptions(Some(database_id))
841 .await?;
842
843 let cdc_table_snapshot_splits =
844 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
845 .await?;
846
847 self.refresh_manager
848 .remove_trackers_by_database(database_id);
849
850 Ok(DatabaseRuntimeInfoSnapshot {
851 recovery_context,
852 state_table_committed_epochs,
853 state_table_log_epochs,
854 mv_depended_subscriptions,
855 background_jobs,
856 cdc_table_snapshot_splits,
857 })
858 }
859}
860
861#[cfg(test)]
862mod tests {
863 use std::collections::HashMap;
864
865 use risingwave_common::catalog::FragmentTypeMask;
866 use risingwave_common::id::WorkerId;
867 use risingwave_meta_model::DispatcherType;
868 use risingwave_meta_model::fragment::DistributionType;
869 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
870 use risingwave_pb::stream_plan::{
871 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
872 };
873
874 use super::*;
875 use crate::controller::fragment::InflightActorInfo;
876 use crate::model::DownstreamFragmentRelation;
877 use crate::stream::UpstreamSinkInfo;
878
879 #[test]
880 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
881 let database_id = DatabaseId::new(1);
882 let job_id = JobId::new(10);
883 let fragment_id = FragmentId::new(100);
884 let sink_fragment_id = FragmentId::new(200);
885
886 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
887 let fragment = InflightFragmentInfo {
888 fragment_id,
889 distribution_type: DistributionType::Hash,
890 fragment_type_mask: FragmentTypeMask::empty(),
891 vnode_count: 1,
892 nodes: PbStreamNode {
893 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
894 PbUpstreamSinkUnionNode {
895 init_upstreams: vec![],
896 },
897 ))),
898 ..Default::default()
899 },
900 actors: HashMap::new(),
901 state_table_ids: HashSet::new(),
902 };
903
904 inflight_jobs
905 .entry(database_id)
906 .or_default()
907 .entry(job_id)
908 .or_default()
909 .insert(fragment_id, fragment);
910
911 let upstream_sink_recovery = HashMap::from([(
912 job_id,
913 UpstreamSinkRecoveryInfo {
914 target_fragment_id: fragment_id,
915 upstream_infos: vec![UpstreamSinkInfo {
916 sink_id: SinkId::new(1),
917 sink_fragment_id,
918 sink_output_fields: vec![],
919 sink_original_target_columns: vec![],
920 project_exprs: vec![],
921 new_sink_downstream: DownstreamFragmentRelation {
922 downstream_fragment_id: FragmentId::new(300),
923 dispatcher_type: DispatcherType::Hash,
924 dist_key_indices: vec![],
925 output_mapping: PbDispatchOutputMapping::default(),
926 },
927 }],
928 },
929 )]);
930
931 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
932
933 let updated = inflight_jobs
934 .get(&database_id)
935 .unwrap()
936 .get(&job_id)
937 .unwrap()
938 .get(&fragment_id)
939 .unwrap();
940
941 let PbNodeBody::UpstreamSinkUnion(updated_union) =
942 updated.nodes.node_body.as_ref().unwrap()
943 else {
944 panic!("expected upstream sink union node");
945 };
946
947 assert_eq!(updated_union.init_upstreams.len(), 1);
948 assert_eq!(
949 updated_union.init_upstreams[0].upstream_fragment_id,
950 sink_fragment_id.as_raw_id()
951 );
952 }
953
954 #[test]
955 fn test_build_stream_actors_uses_preloaded_extra_info() {
956 let database_id = DatabaseId::new(2);
957 let job_id = JobId::new(20);
958 let fragment_id = FragmentId::new(120);
959 let actor_id = ActorId::new(500);
960
961 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
962 inflight_jobs
963 .entry(database_id)
964 .or_default()
965 .entry(job_id)
966 .or_default()
967 .insert(
968 fragment_id,
969 InflightFragmentInfo {
970 fragment_id,
971 distribution_type: DistributionType::Hash,
972 fragment_type_mask: FragmentTypeMask::empty(),
973 vnode_count: 1,
974 nodes: PbStreamNode::default(),
975 actors: HashMap::from([(
976 actor_id,
977 InflightActorInfo {
978 worker_id: WorkerId::new(1),
979 vnode_bitmap: None,
980 splits: vec![],
981 },
982 )]),
983 state_table_ids: HashSet::new(),
984 },
985 );
986
987 let job_extra_info = HashMap::from([(
988 job_id,
989 StreamingJobExtraInfo {
990 timezone: Some("UTC".to_owned()),
991 config_override: "cfg".into(),
992 job_definition: "definition".to_owned(),
993 backfill_orders: None,
994 },
995 )]);
996
997 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
998
999 let actor = stream_actors.get(&actor_id).unwrap();
1000 assert_eq!(actor.actor_id, actor_id);
1001 assert_eq!(actor.fragment_id, fragment_id);
1002 assert_eq!(actor.mview_definition, "definition");
1003 assert_eq!(&*actor.config_override, "cfg");
1004 let expr_ctx = actor.expr_context.as_ref().unwrap();
1005 assert_eq!(expr_ctx.time_zone, "UTC");
1006 }
1007}