1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::controller::utils::StreamingJobExtraInfo;
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
46use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
47use crate::stream::cdc::reload_cdc_table_snapshot_splits;
48use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
49
50struct UpstreamSinkRecoveryInfo {
51 target_fragment_id: FragmentId,
52 upstream_infos: Vec<UpstreamSinkInfo>,
53}
54
55struct LoadedRecoveryContext {
56 fragment_context: LoadedFragmentContext,
57 job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
58 upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
59 fragment_relations: FragmentDownstreamRelation,
60}
61
62impl LoadedRecoveryContext {
63 fn empty(fragment_context: LoadedFragmentContext) -> Self {
64 Self {
65 fragment_context,
66 job_extra_info: HashMap::new(),
67 upstream_sink_recovery: HashMap::new(),
68 fragment_relations: FragmentDownstreamRelation::default(),
69 }
70 }
71
72 fn backfill_orders(&self) -> HashMap<JobId, HashMap<FragmentId, Vec<FragmentId>>> {
73 self.job_extra_info
74 .iter()
75 .map(|(job_id, extra_info)| {
76 (
77 *job_id,
78 extra_info.backfill_orders.clone().unwrap_or_default().0,
79 )
80 })
81 .collect()
82 }
83}
84
85fn recovery_table_with_upstream_sinks(
90 inflight_jobs: &mut FragmentRenderMap,
91 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
92) -> MetaResult<()> {
93 if upstream_sink_recovery.is_empty() {
94 return Ok(());
95 }
96
97 let mut seen_jobs = HashSet::new();
98
99 for jobs in inflight_jobs.values_mut() {
100 for (job_id, fragments) in jobs {
101 if !seen_jobs.insert(*job_id) {
102 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
103 }
104
105 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
106 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
107 refill_upstream_sink_union_in_table(
108 &mut target_fragment.nodes,
109 &recovery.upstream_infos,
110 );
111 } else {
112 return Err(anyhow::anyhow!(
113 "target fragment {} not found for upstream sink recovery of job {}",
114 recovery.target_fragment_id,
115 job_id
116 )
117 .into());
118 }
119 }
120 }
121 }
122
123 Ok(())
124}
125
126fn build_stream_actors(
132 all_info: &FragmentRenderMap,
133 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
134) -> MetaResult<HashMap<ActorId, StreamActor>> {
135 let mut stream_actors = HashMap::new();
136
137 for (job_id, streaming_info) in all_info.values().flatten() {
138 let extra_info = job_extra_info
139 .get(job_id)
140 .cloned()
141 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
142 let expr_context = extra_info.stream_context().to_expr_context();
143 let job_definition = extra_info.job_definition;
144 let config_override = extra_info.config_override;
145
146 for (fragment_id, fragment_infos) in streaming_info {
147 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
148 stream_actors.insert(
149 *actor_id,
150 StreamActor {
151 actor_id: *actor_id,
152 fragment_id: *fragment_id,
153 vnode_bitmap: vnode_bitmap.clone(),
154 mview_definition: job_definition.clone(),
155 expr_context: Some(expr_context.clone()),
156 config_override: config_override.clone(),
157 },
158 );
159 }
160 }
161 }
162 Ok(stream_actors)
163}
164
165impl GlobalBarrierWorkerContextImpl {
166 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
168 self.metadata_manager
169 .catalog_controller
170 .clean_dirty_subscription(database_id)
171 .await?;
172 let dirty_associated_source_ids = self
173 .metadata_manager
174 .catalog_controller
175 .clean_dirty_creating_jobs(database_id)
176 .await?;
177 self.metadata_manager
178 .reset_all_refresh_jobs_to_idle()
179 .await?;
180
181 self.source_manager
183 .apply_source_change(SourceChange::DropSource {
184 dropped_source_ids: dirty_associated_source_ids,
185 })
186 .await;
187
188 Ok(())
189 }
190
191 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
192 if let Some(database_id) = database_id {
193 let sink_ids = self
194 .metadata_manager
195 .catalog_controller
196 .list_sink_ids(Some(database_id))
197 .await?;
198 self.sink_manager.stop_sink_coordinator(sink_ids).await;
199 } else {
200 self.sink_manager.reset().await;
201 }
202 Ok(())
203 }
204
205 async fn abort_dirty_pending_sink_state(
206 &self,
207 database_id: Option<DatabaseId>,
208 ) -> MetaResult<()> {
209 let pending_sinks: HashSet<SinkId> = self
210 .metadata_manager
211 .catalog_controller
212 .list_all_pending_sinks(database_id)
213 .await?;
214
215 if pending_sinks.is_empty() {
216 return Ok(());
217 }
218
219 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
220 .metadata_manager
221 .catalog_controller
222 .fetch_sink_with_state_table_ids(pending_sinks)
223 .await?;
224
225 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
226
227 for (sink_id, table_ids) in sink_with_state_tables {
228 let Some(table_id) = table_ids.first() else {
229 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
230 };
231
232 self.hummock_manager
233 .on_current_version(|version| -> MetaResult<()> {
234 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
235 assert!(
236 sink_committed_epoch
237 .insert(sink_id, committed_epoch)
238 .is_none()
239 );
240 Ok(())
241 } else {
242 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
243 }
244 })
245 .await?;
246 }
247
248 self.metadata_manager
249 .catalog_controller
250 .abort_pending_sink_epochs(sink_committed_epoch)
251 .await?;
252
253 Ok(())
254 }
255
256 async fn purge_state_table_from_hummock(
257 &self,
258 all_state_table_ids: &HashSet<TableId>,
259 ) -> MetaResult<()> {
260 self.hummock_manager.purge(all_state_table_ids).await?;
261 Ok(())
262 }
263
264 async fn list_background_job_progress(
265 &self,
266 database_id: Option<DatabaseId>,
267 ) -> MetaResult<HashSet<JobId>> {
268 let mgr = &self.metadata_manager;
269 mgr.catalog_controller
270 .list_background_creating_jobs(false, database_id)
271 .await
272 }
273
274 fn render_actor_assignments(
276 &self,
277 database_id: Option<DatabaseId>,
278 loaded: &LoadedFragmentContext,
279 worker_nodes: &ActiveStreamingWorkerNodes,
280 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
281 ) -> MetaResult<FragmentRenderMap> {
282 if loaded.is_empty() {
283 return Ok(HashMap::new());
284 }
285
286 let available_workers: BTreeMap<_, _> = worker_nodes
287 .current()
288 .values()
289 .filter(|worker| worker.is_streaming_schedulable())
290 .map(|worker| {
291 (
292 worker.id,
293 WorkerInfo {
294 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
295 resource_group: worker.resource_group(),
296 },
297 )
298 })
299 .collect();
300
301 let RenderedGraph { fragments, .. } = render_actor_assignments(
302 self.metadata_manager
303 .catalog_controller
304 .env
305 .actor_id_generator(),
306 &available_workers,
307 adaptive_parallelism_strategy,
308 loaded,
309 )?;
310
311 if let Some(database_id) = database_id {
312 for loaded_database_id in fragments.keys() {
313 assert_eq!(*loaded_database_id, database_id);
314 }
315 }
316
317 Ok(fragments)
318 }
319
320 async fn load_recovery_context(
321 &self,
322 database_id: Option<DatabaseId>,
323 ) -> MetaResult<LoadedRecoveryContext> {
324 let inner = self
325 .metadata_manager
326 .catalog_controller
327 .get_inner_read_guard()
328 .await;
329 let txn = inner.db.begin().await?;
330
331 let fragment_context = self
332 .metadata_manager
333 .catalog_controller
334 .load_fragment_context_in_txn(&txn, database_id)
335 .await
336 .inspect_err(|err| {
337 warn!(error = %err.as_report(), "load fragment context failed");
338 })?;
339
340 if fragment_context.is_empty() {
341 return Ok(LoadedRecoveryContext::empty(fragment_context));
342 }
343
344 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
345 let job_extra_info = self
346 .metadata_manager
347 .catalog_controller
348 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
349 .await?;
350
351 let mut upstream_targets = HashMap::new();
352 for fragment in fragment_context.fragment_map.values() {
353 let mut has_upstream_union = false;
354 visit_stream_node_cont(&fragment.stream_node.to_protobuf(), |node| {
355 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
356 has_upstream_union = true;
357 false
358 } else {
359 true
360 }
361 });
362
363 if has_upstream_union
364 && let Some(previous) =
365 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
366 {
367 bail!(
368 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
369 fragment.job_id,
370 fragment.fragment_id,
371 previous
372 );
373 }
374 }
375
376 let mut upstream_sink_recovery = HashMap::new();
377 if !upstream_targets.is_empty() {
378 let tables = self
379 .metadata_manager
380 .catalog_controller
381 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
382 .await?;
383
384 for table in tables {
385 let job_id = table.id.as_job_id();
386 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
387 tracing::debug!(
389 job_id = %job_id,
390 "upstream sink union target fragment not found for table"
391 );
392 continue;
393 };
394
395 let upstream_infos = self
396 .metadata_manager
397 .catalog_controller
398 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
399 .await?;
400
401 upstream_sink_recovery.insert(
402 job_id,
403 UpstreamSinkRecoveryInfo {
404 target_fragment_id: *target_fragment_id,
405 upstream_infos,
406 },
407 );
408 }
409 }
410
411 let fragment_relations = self
412 .metadata_manager
413 .catalog_controller
414 .get_fragment_downstream_relations_in_txn(
415 &txn,
416 fragment_context.fragment_map.keys().copied().collect_vec(),
417 )
418 .await?;
419
420 Ok(LoadedRecoveryContext {
421 fragment_context,
422 job_extra_info,
423 upstream_sink_recovery,
424 fragment_relations,
425 })
426 }
427
428 #[expect(clippy::type_complexity)]
429 fn resolve_hummock_version_epochs(
430 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
431 version: &HummockVersion,
432 ) -> MetaResult<(
433 HashMap<TableId, u64>,
434 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
435 )> {
436 let table_committed_epoch: HashMap<_, _> = version
437 .state_table_info
438 .info()
439 .iter()
440 .map(|(table_id, info)| (*table_id, info.committed_epoch))
441 .collect();
442 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
443 Ok(*table_committed_epoch
444 .get(&table_id)
445 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
446 };
447 let mut min_downstream_committed_epochs = HashMap::new();
448 for (job_id, fragments) in background_jobs {
449 let job_committed_epoch = {
450 let mut table_id_iter =
451 InflightFragmentInfo::existing_table_ids(fragments.values());
452 let Some(first_table_id) = table_id_iter.next() else {
453 bail!("job {} has no state table", job_id);
454 };
455 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
456 for table_id in table_id_iter {
457 let table_committed_epoch = get_table_committed_epoch(table_id)?;
458 if job_committed_epoch != table_committed_epoch {
459 bail!(
460 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
461 first_table_id,
462 job_committed_epoch,
463 table_id,
464 table_committed_epoch,
465 job_id
466 );
467 }
468 }
469
470 job_committed_epoch
471 };
472 if let (Some(snapshot_backfill_info), _) =
473 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
474 fragments
475 .values()
476 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
477 )?
478 {
479 for (upstream_table, snapshot_epoch) in
480 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
481 {
482 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
483 anyhow!(
484 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
485 job_id, upstream_table
486 )
487 })?;
488 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
489 match min_downstream_committed_epochs.entry(upstream_table) {
490 Entry::Occupied(entry) => {
491 let prev_min_epoch = entry.into_mut();
492 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
493 }
494 Entry::Vacant(entry) => {
495 entry.insert(pinned_epoch);
496 }
497 }
498 }
499 }
500 }
501 let mut log_epochs = HashMap::new();
502 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
503 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
504 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
505 Ordering::Less => {
506 bail!(
507 "downstream epoch {} later than upstream epoch {} of table {}",
508 downstream_committed_epoch,
509 upstream_committed_epoch,
510 upstream_table_id
511 );
512 }
513 Ordering::Equal => {
514 continue;
515 }
516 Ordering::Greater => {
517 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
518 {
519 let epochs = table_change_log
520 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
521 .map(|epoch_log| {
522 (
523 epoch_log.non_checkpoint_epochs.clone(),
524 epoch_log.checkpoint_epoch,
525 )
526 })
527 .collect_vec();
528 let first_epochs = epochs.first();
529 if let Some((_, first_checkpoint_epoch)) = &first_epochs
530 && *first_checkpoint_epoch == downstream_committed_epoch
531 {
532 } else {
533 bail!(
534 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
535 epochs,
536 upstream_table_id,
537 downstream_committed_epoch
538 );
539 }
540 log_epochs
541 .try_insert(upstream_table_id, epochs)
542 .expect("non-duplicated");
543 } else {
544 bail!(
545 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
546 upstream_table_id,
547 upstream_committed_epoch,
548 downstream_committed_epoch
549 );
550 }
551 }
552 }
553 }
554 Ok((table_committed_epoch, log_epochs))
555 }
556
557 pub(super) async fn reload_runtime_info_impl(
558 &self,
559 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
560 {
561 {
562 {
563 self.clean_dirty_streaming_jobs(None)
564 .await
565 .context("clean dirty streaming jobs")?;
566
567 self.reset_sink_coordinator(None)
568 .await
569 .context("reset sink coordinator")?;
570 self.abort_dirty_pending_sink_state(None)
571 .await
572 .context("abort dirty pending sink state")?;
573
574 tracing::info!("recovering background job progress");
576 let initial_background_jobs = self
577 .list_background_job_progress(None)
578 .await
579 .context("recover background job progress should not fail")?;
580
581 tracing::info!("recovered background job progress");
582
583 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
585 self.metadata_manager
586 .catalog_controller
587 .cleanup_dropped_tables()
588 .await;
589
590 let adaptive_parallelism_strategy = {
591 let system_params_reader = self
592 .metadata_manager
593 .catalog_controller
594 .env
595 .system_params_reader()
596 .await;
597 system_params_reader.adaptive_parallelism_strategy()
598 };
599
600 let active_streaming_nodes =
601 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
602 .await?;
603
604 let background_streaming_jobs =
605 initial_background_jobs.iter().cloned().collect_vec();
606
607 tracing::info!(
608 "background streaming jobs: {:?} total {}",
609 background_streaming_jobs,
610 background_streaming_jobs.len()
611 );
612
613 let unreschedulable_jobs = {
614 let mut unreschedulable_jobs = HashSet::new();
615
616 for job_id in background_streaming_jobs {
617 let scan_types = self
618 .metadata_manager
619 .get_job_backfill_scan_types(job_id)
620 .await?;
621
622 if scan_types
623 .values()
624 .any(|scan_type| !scan_type.is_reschedulable())
625 {
626 unreschedulable_jobs.insert(job_id);
627 }
628 }
629
630 unreschedulable_jobs
631 };
632
633 if !unreschedulable_jobs.is_empty() {
634 info!(
635 "unreschedulable background jobs: {:?}",
636 unreschedulable_jobs
637 );
638 }
639
640 if !unreschedulable_jobs.is_empty() {
644 bail!(
645 "Recovery for unreschedulable background jobs is not yet implemented. \
646 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
647 unreschedulable_jobs
648 );
649 }
650
651 info!("trigger offline re-rendering");
652 let mut recovery_context = self.load_recovery_context(None).await?;
653
654 let mut info = self
655 .render_actor_assignments(
656 None,
657 &recovery_context.fragment_context,
658 &active_streaming_nodes,
659 adaptive_parallelism_strategy,
660 )
661 .inspect_err(|err| {
662 warn!(error = %err.as_report(), "render actor assignments failed");
663 })?;
664
665 info!("offline re-rendering completed");
666
667 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
668 if !dropped_table_ids.is_empty() {
669 self.metadata_manager
670 .catalog_controller
671 .complete_dropped_tables(dropped_table_ids)
672 .await;
673 recovery_context = self.load_recovery_context(None).await?;
674 info = self
675 .render_actor_assignments(
676 None,
677 &recovery_context.fragment_context,
678 &active_streaming_nodes,
679 adaptive_parallelism_strategy,
680 )
681 .inspect_err(|err| {
682 warn!(error = %err.as_report(), "render actor assignments failed");
683 })?
684 }
685
686 recovery_table_with_upstream_sinks(
687 &mut info,
688 &recovery_context.upstream_sink_recovery,
689 )?;
690
691 let info = info;
692
693 self.purge_state_table_from_hummock(
694 &info
695 .values()
696 .flatten()
697 .flat_map(|(_, fragments)| {
698 InflightFragmentInfo::existing_table_ids(fragments.values())
699 })
700 .collect(),
701 )
702 .await
703 .context("purge state table from hummock")?;
704
705 let (state_table_committed_epochs, state_table_log_epochs) = self
706 .hummock_manager
707 .on_current_version(|version| {
708 Self::resolve_hummock_version_epochs(
709 info.values().flat_map(|jobs| {
710 jobs.iter().filter_map(|(job_id, job)| {
711 initial_background_jobs
712 .contains(job_id)
713 .then_some((*job_id, job))
714 })
715 }),
716 version,
717 )
718 })
719 .await?;
720
721 let mv_depended_subscriptions = self
722 .metadata_manager
723 .get_mv_depended_subscriptions(None)
724 .await?;
725
726 let stream_actors =
727 build_stream_actors(&info, &recovery_context.job_extra_info)?;
728
729 let backfill_orders = recovery_context.backfill_orders();
730 let fragment_relations = recovery_context.fragment_relations;
731
732 let background_jobs = {
734 let mut refreshed_background_jobs = self
735 .list_background_job_progress(None)
736 .await
737 .context("recover background job progress should not fail")?;
738 info.values()
739 .flatten()
740 .filter_map(|(job_id, _)| {
741 refreshed_background_jobs.remove(job_id).then_some(*job_id)
742 })
743 .collect()
744 };
745
746 let database_infos = self
747 .metadata_manager
748 .catalog_controller
749 .list_databases()
750 .await?;
751
752 let mut source_splits = HashMap::new();
754 for (_, fragment_infos) in info.values().flatten() {
755 for fragment in fragment_infos.values() {
756 for (actor_id, info) in &fragment.actors {
757 source_splits.insert(*actor_id, info.splits.clone());
758 }
759 }
760 }
761
762 let cdc_table_snapshot_splits =
763 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
764 .await?;
765
766 Ok(BarrierWorkerRuntimeInfoSnapshot {
767 active_streaming_nodes,
768 database_job_infos: info,
769 backfill_orders,
770 state_table_committed_epochs,
771 state_table_log_epochs,
772 mv_depended_subscriptions,
773 stream_actors,
774 fragment_relations,
775 source_splits,
776 background_jobs,
777 hummock_version_stats: self.hummock_manager.get_version_stats().await,
778 database_infos,
779 cdc_table_snapshot_splits,
780 })
781 }
782 }
783 }
784 }
785
786 pub(super) async fn reload_database_runtime_info_impl(
787 &self,
788 database_id: DatabaseId,
789 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
790 self.clean_dirty_streaming_jobs(Some(database_id))
791 .await
792 .context("clean dirty streaming jobs")?;
793
794 self.reset_sink_coordinator(Some(database_id))
795 .await
796 .context("reset sink coordinator")?;
797 self.abort_dirty_pending_sink_state(Some(database_id))
798 .await
799 .context("abort dirty pending sink state")?;
800
801 tracing::info!(
803 ?database_id,
804 "recovering background job progress of database"
805 );
806
807 let background_jobs = self
808 .list_background_job_progress(Some(database_id))
809 .await
810 .context("recover background job progress of database should not fail")?;
811 tracing::info!(?database_id, "recovered background job progress");
812
813 let dropped_table_ids = self
815 .scheduled_barriers
816 .pre_apply_drop_cancel(Some(database_id));
817 self.metadata_manager
818 .catalog_controller
819 .complete_dropped_tables(dropped_table_ids)
820 .await;
821
822 let adaptive_parallelism_strategy = {
823 let system_params_reader = self
824 .metadata_manager
825 .catalog_controller
826 .env
827 .system_params_reader()
828 .await;
829 system_params_reader.adaptive_parallelism_strategy()
830 };
831
832 let active_streaming_nodes =
833 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
834
835 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
836
837 let mut all_info = self
838 .render_actor_assignments(
839 Some(database_id),
840 &recovery_context.fragment_context,
841 &active_streaming_nodes,
842 adaptive_parallelism_strategy,
843 )
844 .inspect_err(|err| {
845 warn!(error = %err.as_report(), "render actor assignments failed");
846 })?;
847
848 let mut database_info = all_info
849 .remove(&database_id)
850 .map_or_else(HashMap::new, |table_map| {
851 HashMap::from([(database_id, table_map)])
852 });
853
854 recovery_table_with_upstream_sinks(
855 &mut database_info,
856 &recovery_context.upstream_sink_recovery,
857 )?;
858
859 assert!(database_info.len() <= 1);
860
861 let stream_actors = build_stream_actors(&database_info, &recovery_context.job_extra_info)?;
862
863 let Some(info) = database_info
864 .into_iter()
865 .next()
866 .map(|(loaded_database_id, info)| {
867 assert_eq!(loaded_database_id, database_id);
868 info
869 })
870 else {
871 return Ok(None);
872 };
873
874 let missing_background_jobs = background_jobs
875 .iter()
876 .filter(|job_id| !info.contains_key(job_id))
877 .copied()
878 .collect_vec();
879 if !missing_background_jobs.is_empty() {
880 warn!(
881 database_id = %database_id,
882 missing_job_ids = ?missing_background_jobs,
883 "background jobs missing in rendered info"
884 );
885 }
886
887 let (state_table_committed_epochs, state_table_log_epochs) = self
888 .hummock_manager
889 .on_current_version(|version| {
890 Self::resolve_hummock_version_epochs(
891 background_jobs
892 .iter()
893 .filter_map(|job_id| info.get(job_id).map(|job| (*job_id, job))),
894 version,
895 )
896 })
897 .await?;
898
899 let mv_depended_subscriptions = self
900 .metadata_manager
901 .get_mv_depended_subscriptions(Some(database_id))
902 .await?;
903
904 let backfill_orders = recovery_context.backfill_orders();
905 let fragment_relations = recovery_context.fragment_relations;
906
907 let mut source_splits = HashMap::new();
909 for (_, fragment) in info.values().flatten() {
910 for (actor_id, info) in &fragment.actors {
911 source_splits.insert(*actor_id, info.splits.clone());
912 }
913 }
914
915 let cdc_table_snapshot_splits =
916 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
917 .await?;
918
919 self.refresh_manager
920 .remove_trackers_by_database(database_id);
921
922 Ok(Some(DatabaseRuntimeInfoSnapshot {
923 job_infos: info,
924 backfill_orders,
925 state_table_committed_epochs,
926 state_table_log_epochs,
927 mv_depended_subscriptions,
928 stream_actors,
929 fragment_relations,
930 source_splits,
931 background_jobs,
932 cdc_table_snapshot_splits,
933 }))
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use std::collections::HashMap;
940
941 use risingwave_common::catalog::FragmentTypeMask;
942 use risingwave_common::id::WorkerId;
943 use risingwave_meta_model::DispatcherType;
944 use risingwave_meta_model::fragment::DistributionType;
945 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
946 use risingwave_pb::stream_plan::{
947 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
948 };
949
950 use super::*;
951 use crate::controller::fragment::InflightActorInfo;
952 use crate::model::DownstreamFragmentRelation;
953 use crate::stream::UpstreamSinkInfo;
954
955 #[test]
956 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
957 let database_id = DatabaseId::new(1);
958 let job_id = JobId::new(10);
959 let fragment_id = FragmentId::new(100);
960 let sink_fragment_id = FragmentId::new(200);
961
962 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
963 let fragment = InflightFragmentInfo {
964 fragment_id,
965 distribution_type: DistributionType::Hash,
966 fragment_type_mask: FragmentTypeMask::empty(),
967 vnode_count: 1,
968 nodes: PbStreamNode {
969 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
970 PbUpstreamSinkUnionNode {
971 init_upstreams: vec![],
972 },
973 ))),
974 ..Default::default()
975 },
976 actors: HashMap::new(),
977 state_table_ids: HashSet::new(),
978 };
979
980 inflight_jobs
981 .entry(database_id)
982 .or_default()
983 .entry(job_id)
984 .or_default()
985 .insert(fragment_id, fragment);
986
987 let upstream_sink_recovery = HashMap::from([(
988 job_id,
989 UpstreamSinkRecoveryInfo {
990 target_fragment_id: fragment_id,
991 upstream_infos: vec![UpstreamSinkInfo {
992 sink_id: SinkId::new(1),
993 sink_fragment_id,
994 sink_output_fields: vec![],
995 sink_original_target_columns: vec![],
996 project_exprs: vec![],
997 new_sink_downstream: DownstreamFragmentRelation {
998 downstream_fragment_id: FragmentId::new(300),
999 dispatcher_type: DispatcherType::Hash,
1000 dist_key_indices: vec![],
1001 output_mapping: PbDispatchOutputMapping::default(),
1002 },
1003 }],
1004 },
1005 )]);
1006
1007 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1008
1009 let updated = inflight_jobs
1010 .get(&database_id)
1011 .unwrap()
1012 .get(&job_id)
1013 .unwrap()
1014 .get(&fragment_id)
1015 .unwrap();
1016
1017 let PbNodeBody::UpstreamSinkUnion(updated_union) =
1018 updated.nodes.node_body.as_ref().unwrap()
1019 else {
1020 panic!("expected upstream sink union node");
1021 };
1022
1023 assert_eq!(updated_union.init_upstreams.len(), 1);
1024 assert_eq!(
1025 updated_union.init_upstreams[0].upstream_fragment_id,
1026 sink_fragment_id.as_raw_id()
1027 );
1028 }
1029
1030 #[test]
1031 fn test_build_stream_actors_uses_preloaded_extra_info() {
1032 let database_id = DatabaseId::new(2);
1033 let job_id = JobId::new(20);
1034 let fragment_id = FragmentId::new(120);
1035 let actor_id = ActorId::new(500);
1036
1037 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1038 inflight_jobs
1039 .entry(database_id)
1040 .or_default()
1041 .entry(job_id)
1042 .or_default()
1043 .insert(
1044 fragment_id,
1045 InflightFragmentInfo {
1046 fragment_id,
1047 distribution_type: DistributionType::Hash,
1048 fragment_type_mask: FragmentTypeMask::empty(),
1049 vnode_count: 1,
1050 nodes: PbStreamNode::default(),
1051 actors: HashMap::from([(
1052 actor_id,
1053 InflightActorInfo {
1054 worker_id: WorkerId::new(1),
1055 vnode_bitmap: None,
1056 splits: vec![],
1057 },
1058 )]),
1059 state_table_ids: HashSet::new(),
1060 },
1061 );
1062
1063 let job_extra_info = HashMap::from([(
1064 job_id,
1065 StreamingJobExtraInfo {
1066 timezone: Some("UTC".to_owned()),
1067 config_override: "cfg".into(),
1068 job_definition: "definition".to_owned(),
1069 backfill_orders: None,
1070 },
1071 )]);
1072
1073 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1074
1075 let actor = stream_actors.get(&actor_id).unwrap();
1076 assert_eq!(actor.actor_id, actor_id);
1077 assert_eq!(actor.fragment_id, fragment_id);
1078 assert_eq!(actor.mview_definition, "definition");
1079 assert_eq!(&*actor.config_override, "cfg");
1080 let expr_ctx = actor.expr_context.as_ref().unwrap();
1081 assert_eq!(expr_ctx.time_zone, "UTC");
1082 }
1083}