1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::TableChangeLogs;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::checkpoint::{
39 BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, BatchRefreshRenderResult,
40};
41use crate::barrier::context::GlobalBarrierWorkerContextImpl;
42use crate::barrier::rpc::to_partial_graph_id;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::controller::scale::{
45 FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
46 render_actor_assignments,
47};
48use crate::controller::utils::StreamingJobExtraInfo;
49use crate::manager::ActiveStreamingWorkerNodes;
50use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
51use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
52use crate::stream::cdc::reload_cdc_table_snapshot_splits;
53use crate::stream::{
54 SourceChange, StreamFragmentGraph, UpstreamSinkInfo, cleanup_dropped_streaming_jobs,
55};
56
57#[derive(Debug)]
58pub(crate) struct UpstreamSinkRecoveryInfo {
59 target_fragment_id: FragmentId,
60 upstream_infos: Vec<UpstreamSinkInfo>,
61}
62
63#[derive(Debug)]
64pub struct LoadedRecoveryContext {
65 pub fragment_context: LoadedFragmentContext,
66 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
67 pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
68 pub fragment_relations: FragmentDownstreamRelation,
69}
70
71impl LoadedRecoveryContext {
72 fn empty(fragment_context: LoadedFragmentContext) -> Self {
73 Self {
74 fragment_context,
75 job_extra_info: HashMap::new(),
76 upstream_sink_recovery: HashMap::new(),
77 fragment_relations: FragmentDownstreamRelation::default(),
78 }
79 }
80}
81
82pub struct RenderedDatabaseRuntimeInfo {
83 pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
84 pub stream_actors: HashMap<ActorId, StreamActor>,
85 pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
86 pub batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
88}
89
90pub fn render_runtime_info(
91 actor_id_generator: &AtomicU32,
92 worker_nodes: &ActiveStreamingWorkerNodes,
93 recovery_context: &LoadedRecoveryContext,
94 database_id: DatabaseId,
95) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
96 let Some(mut per_database_context) =
97 recovery_context.fragment_context.for_database(database_id)
98 else {
99 return Ok(None);
100 };
101
102 assert!(!per_database_context.is_empty());
103
104 let batch_refresh_job_ids: HashSet<JobId> = per_database_context
107 .job_map
108 .iter()
109 .filter(|(_, model)| model.refresh_interval_sec.is_some())
110 .map(|(job_id, _)| *job_id)
111 .collect();
112
113 let mut batch_refresh_logical = HashMap::new();
114 if !batch_refresh_job_ids.is_empty() {
115 let batch_refresh_fragment_ids: HashSet<FragmentId> = batch_refresh_job_ids
116 .iter()
117 .flat_map(|job_id| {
118 per_database_context
119 .job_fragments
120 .get(job_id)
121 .unwrap()
122 .keys()
123 .copied()
124 })
125 .collect();
126
127 for &job_id in &batch_refresh_job_ids {
128 let fragments = per_database_context.job_fragments.remove(&job_id).unwrap();
129 let downstreams = fragments
130 .keys()
131 .filter_map(|fid| {
132 recovery_context
133 .fragment_relations
134 .get(fid)
135 .map(|r| (*fid, r.clone()))
136 })
137 .collect();
138 batch_refresh_logical.insert(
139 job_id,
140 BatchRefreshLogicalFragments {
141 fragments,
142 downstreams,
143 },
144 );
145 }
146
147 per_database_context.ensembles.retain(|ensemble| {
149 !ensemble
150 .component_fragments()
151 .all(|fid| batch_refresh_fragment_ids.contains(&fid))
152 });
153 }
154
155 let mut batch_refresh = HashMap::new();
157 for (job_id, logical) in batch_refresh_logical {
158 let extra = recovery_context
159 .job_extra_info
160 .get(&job_id)
161 .expect("should have extra info");
162 let streaming_job_model = per_database_context
163 .job_map
164 .get(&job_id)
165 .expect("should have streaming job model");
166 let database_model = &per_database_context.database_map[&database_id];
167 let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
168
169 let render_result = BatchRefreshJobCheckpointControl::render_actors_and_build_job_info(
170 &logical.fragments,
171 &logical.downstreams,
172 &extra.job_definition,
173 actor_id_generator,
174 worker_nodes.current(),
175 &database_model.resource_group,
176 streaming_job_model,
177 partial_graph_id,
178 )?;
179
180 batch_refresh.insert(job_id, render_result);
181 }
182
183 if per_database_context.ensembles.is_empty() {
185 return Ok(Some(RenderedDatabaseRuntimeInfo {
186 job_infos: HashMap::new(),
187 stream_actors: HashMap::new(),
188 source_splits: HashMap::new(),
189 batch_refresh,
190 }));
191 }
192
193 let RenderedGraph { mut fragments, .. } = render_actor_assignments(
194 actor_id_generator,
195 worker_nodes.current(),
196 &per_database_context,
197 )?;
198
199 let single_database = match fragments.remove(&database_id) {
200 Some(info) => info,
201 None => return Ok(None),
202 };
203
204 let mut database_map = HashMap::from([(database_id, single_database)]);
205 recovery_table_with_upstream_sinks(
206 &mut database_map,
207 &recovery_context.upstream_sink_recovery,
208 )?;
209 let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
210
211 let job_infos = database_map
212 .remove(&database_id)
213 .expect("database entry must exist");
214
215 let mut source_splits = HashMap::new();
216 for fragment_infos in job_infos.values() {
217 for fragment in fragment_infos.values() {
218 for (actor_id, info) in &fragment.actors {
219 source_splits.insert(*actor_id, info.splits.clone());
220 }
221 }
222 }
223
224 Ok(Some(RenderedDatabaseRuntimeInfo {
225 job_infos,
226 stream_actors,
227 source_splits,
228 batch_refresh,
229 }))
230}
231
232fn recovery_table_with_upstream_sinks(
237 inflight_jobs: &mut FragmentRenderMap,
238 upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
239) -> MetaResult<()> {
240 if upstream_sink_recovery.is_empty() {
241 return Ok(());
242 }
243
244 let mut seen_jobs = HashSet::new();
245
246 for jobs in inflight_jobs.values_mut() {
247 for (job_id, fragments) in jobs {
248 if !seen_jobs.insert(*job_id) {
249 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
250 }
251
252 if let Some(recovery) = upstream_sink_recovery.get(job_id) {
253 if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
254 refill_upstream_sink_union_in_table(
255 &mut target_fragment.nodes,
256 &recovery.upstream_infos,
257 );
258 } else {
259 return Err(anyhow::anyhow!(
260 "target fragment {} not found for upstream sink recovery of job {}",
261 recovery.target_fragment_id,
262 job_id
263 )
264 .into());
265 }
266 }
267 }
268 }
269
270 Ok(())
271}
272
273fn build_stream_actors(
279 all_info: &FragmentRenderMap,
280 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
281) -> MetaResult<HashMap<ActorId, StreamActor>> {
282 let mut stream_actors = HashMap::new();
283
284 for (job_id, streaming_info) in all_info.values().flatten() {
285 let extra_info = job_extra_info
286 .get(job_id)
287 .cloned()
288 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
289 let expr_context = extra_info.stream_context().to_expr_context();
290 let job_definition = extra_info.job_definition;
291 let config_override = extra_info.config_override;
292
293 for (fragment_id, fragment_infos) in streaming_info {
294 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
295 stream_actors.insert(
296 *actor_id,
297 StreamActor {
298 actor_id: *actor_id,
299 fragment_id: *fragment_id,
300 vnode_bitmap: vnode_bitmap.clone(),
301 mview_definition: job_definition.clone(),
302 expr_context: Some(expr_context.clone()),
303 config_override: config_override.clone(),
304 },
305 );
306 }
307 }
308 }
309 Ok(stream_actors)
310}
311
312impl GlobalBarrierWorkerContextImpl {
313 async fn apply_pre_applied_drop_cancel(
314 &self,
315 database_id: Option<DatabaseId>,
316 ) -> MetaResult<bool> {
317 let drop_cancel = self.scheduled_barriers.pre_apply_drop_cancel(database_id);
318 let has_drop_streaming_jobs = !drop_cancel.streaming_job_ids.is_empty();
319 cleanup_dropped_streaming_jobs(
320 &self.refresh_manager,
321 &self.hummock_manager,
322 &self.metadata_manager,
323 drop_cancel.streaming_job_ids,
324 drop_cancel.dropped_state_table_ids,
325 "drop_streaming_jobs",
326 )
327 .await?;
328 Ok(has_drop_streaming_jobs)
329 }
330
331 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
333 self.metadata_manager
334 .catalog_controller
335 .clean_dirty_subscription(database_id)
336 .await?;
337
338 let cleaned_dirty_jobs = self
339 .metadata_manager
340 .catalog_controller
341 .clean_dirty_creating_jobs(database_id)
342 .await?;
343 if database_id.is_some() {
344 cleanup_dropped_streaming_jobs(
347 &self.refresh_manager,
348 &self.hummock_manager,
349 &self.metadata_manager,
350 cleaned_dirty_jobs.streaming_job_ids,
351 cleaned_dirty_jobs.dropped_table_ids,
352 "clean_dirty_creating_jobs",
353 )
354 .await?;
355 }
356 self.metadata_manager
357 .reset_all_refresh_jobs_to_idle()
358 .await?;
359
360 self.source_manager
362 .apply_source_change(SourceChange::DropSource {
363 dropped_source_ids: cleaned_dirty_jobs.source_ids,
364 })
365 .await;
366
367 Ok(())
368 }
369
370 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
371 if let Some(database_id) = database_id {
372 let sink_ids = self
373 .metadata_manager
374 .catalog_controller
375 .list_sink_ids(Some(database_id))
376 .await?;
377 self.sink_manager.stop_sink_coordinator(sink_ids).await;
378 } else {
379 self.sink_manager.reset().await;
380 }
381 Ok(())
382 }
383
384 async fn abort_dirty_pending_sink_state(
385 &self,
386 database_id: Option<DatabaseId>,
387 ) -> MetaResult<()> {
388 let pending_sinks: HashSet<SinkId> = self
389 .metadata_manager
390 .catalog_controller
391 .list_all_pending_sinks(database_id)
392 .await?;
393
394 if pending_sinks.is_empty() {
395 return Ok(());
396 }
397
398 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
399 .metadata_manager
400 .catalog_controller
401 .fetch_sink_with_state_table_ids(pending_sinks)
402 .await?;
403
404 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
405
406 for (sink_id, table_ids) in sink_with_state_tables {
407 let Some(table_id) = table_ids.first() else {
408 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
409 };
410
411 self.hummock_manager
412 .on_current_version(|version| -> MetaResult<()> {
413 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
414 assert!(
415 sink_committed_epoch
416 .insert(sink_id, committed_epoch)
417 .is_none()
418 );
419 Ok(())
420 } else {
421 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
422 }
423 })
424 .await?;
425 }
426
427 self.metadata_manager
428 .catalog_controller
429 .abort_pending_sink_epochs(sink_committed_epoch)
430 .await?;
431
432 Ok(())
433 }
434
435 async fn purge_state_table_from_hummock(
436 &self,
437 all_state_table_ids: &HashSet<TableId>,
438 ) -> MetaResult<()> {
439 self.hummock_manager.purge(all_state_table_ids).await?;
440 Ok(())
441 }
442
443 async fn list_background_job_progress(
444 &self,
445 database_id: Option<DatabaseId>,
446 ) -> MetaResult<HashSet<JobId>> {
447 let mgr = &self.metadata_manager;
448 mgr.catalog_controller
449 .list_background_creating_jobs(false, database_id)
450 .await
451 }
452
453 async fn load_recovery_context(
454 &self,
455 database_id: Option<DatabaseId>,
456 ) -> MetaResult<LoadedRecoveryContext> {
457 let inner = self
458 .metadata_manager
459 .catalog_controller
460 .get_inner_read_guard()
461 .await;
462 let txn = inner.db.begin().await?;
463
464 let fragment_context = self
465 .metadata_manager
466 .catalog_controller
467 .load_fragment_context_in_txn(&txn, database_id)
468 .await
469 .inspect_err(|err| {
470 warn!(error = %err.as_report(), "load fragment context failed");
471 })?;
472
473 if fragment_context.is_empty() {
474 return Ok(LoadedRecoveryContext::empty(fragment_context));
475 }
476
477 let job_ids = fragment_context.job_map.keys().copied().collect_vec();
478 let job_extra_info = self
479 .metadata_manager
480 .catalog_controller
481 .get_streaming_job_extra_info_in_txn(&txn, job_ids)
482 .await?;
483
484 let mut upstream_targets = HashMap::new();
485 for fragment in fragment_context
486 .job_fragments
487 .values()
488 .flat_map(|fragments| fragments.values())
489 {
490 let mut has_upstream_union = false;
491 visit_stream_node_cont(&fragment.nodes, |node| {
492 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
493 has_upstream_union = true;
494 false
495 } else {
496 true
497 }
498 });
499
500 if has_upstream_union
501 && let Some(previous) =
502 upstream_targets.insert(fragment.job_id, fragment.fragment_id)
503 {
504 bail!(
505 "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
506 fragment.job_id,
507 fragment.fragment_id,
508 previous
509 );
510 }
511 }
512
513 let mut upstream_sink_recovery = HashMap::new();
514 if !upstream_targets.is_empty() {
515 let tables = self
516 .metadata_manager
517 .catalog_controller
518 .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
519 .await?;
520
521 for table in tables {
522 let job_id = table.id.as_job_id();
523 let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
524 tracing::debug!(
526 job_id = %job_id,
527 "upstream sink union target fragment not found for table"
528 );
529 continue;
530 };
531
532 let upstream_infos = self
533 .metadata_manager
534 .catalog_controller
535 .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
536 .await?;
537
538 upstream_sink_recovery.insert(
539 job_id,
540 UpstreamSinkRecoveryInfo {
541 target_fragment_id: *target_fragment_id,
542 upstream_infos,
543 },
544 );
545 }
546 }
547
548 let fragment_relations = self
549 .metadata_manager
550 .catalog_controller
551 .get_fragment_downstream_relations_in_txn(
552 &txn,
553 fragment_context
554 .job_fragments
555 .values()
556 .flat_map(|fragments| fragments.keys().copied())
557 .collect_vec(),
558 )
559 .await?;
560
561 Ok(LoadedRecoveryContext {
562 fragment_context,
563 job_extra_info,
564 upstream_sink_recovery,
565 fragment_relations,
566 })
567 }
568
569 #[expect(clippy::type_complexity)]
570 fn resolve_hummock_version_epochs(
571 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
572 version: &HummockVersion,
573 table_change_log: &TableChangeLogs,
574 ) -> MetaResult<(
575 HashMap<TableId, u64>,
576 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
577 )> {
578 let table_committed_epoch: HashMap<_, _> = version
579 .state_table_info
580 .info()
581 .iter()
582 .map(|(table_id, info)| (*table_id, info.committed_epoch))
583 .collect();
584 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
585 Ok(*table_committed_epoch
586 .get(&table_id)
587 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
588 };
589 let mut min_downstream_committed_epochs = HashMap::new();
590 for (job_id, fragments) in background_jobs {
591 let job_committed_epoch = {
592 let mut table_id_iter = fragments
593 .values()
594 .flat_map(|fragment| fragment.state_table_ids.iter().copied());
595 let Some(first_table_id) = table_id_iter.next() else {
596 bail!("job {} has no state table", job_id);
597 };
598 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
599 for table_id in table_id_iter {
600 let table_committed_epoch = get_table_committed_epoch(table_id)?;
601 if job_committed_epoch != table_committed_epoch {
602 bail!(
603 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
604 first_table_id,
605 job_committed_epoch,
606 table_id,
607 table_committed_epoch,
608 job_id
609 );
610 }
611 }
612
613 job_committed_epoch
614 };
615 if let (Some(snapshot_backfill_info), _) =
616 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
617 fragments
618 .values()
619 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
620 )?
621 {
622 for (upstream_table, snapshot_epoch) in
623 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
624 {
625 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
626 anyhow!(
627 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
628 job_id, upstream_table
629 )
630 })?;
631 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
632 match min_downstream_committed_epochs.entry(upstream_table) {
633 Entry::Occupied(entry) => {
634 let prev_min_epoch = entry.into_mut();
635 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
636 }
637 Entry::Vacant(entry) => {
638 entry.insert(pinned_epoch);
639 }
640 }
641 }
642 }
643 }
644 let mut log_epochs = HashMap::new();
645 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
646 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
647 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
648 Ordering::Less => {
649 bail!(
650 "downstream epoch {} later than upstream epoch {} of table {}",
651 downstream_committed_epoch,
652 upstream_committed_epoch,
653 upstream_table_id
654 );
655 }
656 Ordering::Equal => {
657 continue;
658 }
659 Ordering::Greater => {
660 if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
661 let epochs = table_change_log
662 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
663 .map(|epoch_log| {
664 (
665 epoch_log.non_checkpoint_epochs.clone(),
666 epoch_log.checkpoint_epoch,
667 )
668 })
669 .collect_vec();
670 let first_epochs = epochs.first();
671 if let Some((_, first_checkpoint_epoch)) = &first_epochs
672 && *first_checkpoint_epoch == downstream_committed_epoch
673 {
674 } else {
675 bail!(
676 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
677 epochs,
678 upstream_table_id,
679 downstream_committed_epoch
680 );
681 }
682 log_epochs
683 .try_insert(upstream_table_id, epochs)
684 .expect("non-duplicated");
685 } else {
686 bail!(
687 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
688 upstream_table_id,
689 upstream_committed_epoch,
690 downstream_committed_epoch
691 );
692 }
693 }
694 }
695 }
696 Ok((table_committed_epoch, log_epochs))
697 }
698
699 pub(super) async fn reload_runtime_info_impl(
700 &self,
701 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
702 {
703 {
704 {
705 self.clean_dirty_streaming_jobs(None)
706 .await
707 .context("clean dirty streaming jobs")?;
708
709 self.reset_sink_coordinator(None)
710 .await
711 .context("reset sink coordinator")?;
712 self.abort_dirty_pending_sink_state(None)
713 .await
714 .context("abort dirty pending sink state")?;
715
716 tracing::info!("recovering background job progress");
718 let initial_background_jobs = self
719 .list_background_job_progress(None)
720 .await
721 .context("recover background job progress should not fail")?;
722
723 tracing::info!("recovered background job progress");
724
725 let _ = self.apply_pre_applied_drop_cancel(None).await?;
727 self.metadata_manager
728 .catalog_controller
729 .cleanup_dropped_tables()
730 .await;
731
732 let active_streaming_nodes =
733 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
734 .await?;
735
736 let background_streaming_jobs =
737 initial_background_jobs.iter().cloned().collect_vec();
738
739 tracing::info!(
740 "background streaming jobs: {:?} total {}",
741 background_streaming_jobs,
742 background_streaming_jobs.len()
743 );
744
745 let unreschedulable_jobs = {
746 let mut unreschedulable_jobs = HashSet::new();
747
748 for job_id in background_streaming_jobs {
749 let scan_types = self
750 .metadata_manager
751 .get_job_backfill_scan_types(job_id)
752 .await?;
753
754 if scan_types
755 .values()
756 .any(|scan_type| !scan_type.is_reschedulable(false))
757 {
758 unreschedulable_jobs.insert(job_id);
759 }
760 }
761
762 unreschedulable_jobs
763 };
764
765 if !unreschedulable_jobs.is_empty() {
766 info!(
767 "unreschedulable background jobs: {:?}",
768 unreschedulable_jobs
769 );
770 }
771
772 if !unreschedulable_jobs.is_empty() {
776 bail!(
777 "Recovery for unreschedulable background jobs is not yet implemented. \
778 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
779 unreschedulable_jobs
780 );
781 }
782
783 let mut recovery_context = self.load_recovery_context(None).await?;
784 if self.apply_pre_applied_drop_cancel(None).await? {
785 recovery_context = self.load_recovery_context(None).await?;
786 }
787
788 self.purge_state_table_from_hummock(
789 &recovery_context
790 .fragment_context
791 .job_fragments
792 .values()
793 .flat_map(|fragments| fragments.values())
794 .flat_map(|fragment| fragment.state_table_ids.iter().copied())
795 .collect(),
796 )
797 .await
798 .context("purge state table from hummock")?;
799
800 let (state_table_committed_epochs, state_table_log_epochs) = self
801 .hummock_manager
802 .on_current_version_and_table_change_log(|version, table_change_log| {
803 Self::resolve_hummock_version_epochs(
804 recovery_context
805 .fragment_context
806 .job_fragments
807 .iter()
808 .filter_map(|(job_id, job)| {
809 initial_background_jobs
810 .contains(job_id)
811 .then_some((*job_id, job))
812 }),
813 version,
814 table_change_log,
815 )
816 })
817 .await?;
818
819 let mv_depended_subscriptions = self
820 .metadata_manager
821 .get_mv_depended_subscriptions(None)
822 .await?;
823
824 let background_jobs = {
826 let mut refreshed_background_jobs = self
827 .list_background_job_progress(None)
828 .await
829 .context("recover background job progress should not fail")?;
830 recovery_context
831 .fragment_context
832 .job_map
833 .keys()
834 .filter_map(|job_id| {
835 refreshed_background_jobs.remove(job_id).then_some(*job_id)
836 })
837 .collect()
838 };
839
840 let database_infos = self
841 .metadata_manager
842 .catalog_controller
843 .list_databases()
844 .await?;
845
846 let cdc_table_snapshot_splits =
847 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
848 .await?;
849
850 Ok(BarrierWorkerRuntimeInfoSnapshot {
851 active_streaming_nodes,
852 recovery_context,
853 state_table_committed_epochs,
854 state_table_log_epochs,
855 mv_depended_subscriptions,
856 background_jobs,
857 hummock_version_stats: self.hummock_manager.get_version_stats().await,
858 database_infos,
859 cdc_table_snapshot_splits,
860 })
861 }
862 }
863 }
864 }
865
866 pub(super) async fn reload_database_runtime_info_impl(
867 &self,
868 database_id: DatabaseId,
869 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
870 self.clean_dirty_streaming_jobs(Some(database_id))
871 .await
872 .context("clean dirty streaming jobs")?;
873
874 self.reset_sink_coordinator(Some(database_id))
875 .await
876 .context("reset sink coordinator")?;
877 self.abort_dirty_pending_sink_state(Some(database_id))
878 .await
879 .context("abort dirty pending sink state")?;
880
881 tracing::info!(
883 ?database_id,
884 "recovering background job progress of database"
885 );
886
887 let background_jobs = self
888 .list_background_job_progress(Some(database_id))
889 .await
890 .context("recover background job progress of database should not fail")?;
891 tracing::info!(?database_id, "recovered background job progress");
892
893 let _ = self
895 .apply_pre_applied_drop_cancel(Some(database_id))
896 .await?;
897
898 let recovery_context = self.load_recovery_context(Some(database_id)).await?;
899
900 let missing_background_jobs = background_jobs
901 .iter()
902 .filter(|job_id| {
903 !recovery_context
904 .fragment_context
905 .job_map
906 .contains_key(*job_id)
907 })
908 .copied()
909 .collect_vec();
910 if !missing_background_jobs.is_empty() {
911 warn!(
912 database_id = %database_id,
913 missing_job_ids = ?missing_background_jobs,
914 "background jobs missing in rendered info"
915 );
916 }
917
918 let (state_table_committed_epochs, state_table_log_epochs) = self
919 .hummock_manager
920 .on_current_version_and_table_change_log(|version, table_change_log| {
921 Self::resolve_hummock_version_epochs(
922 background_jobs.iter().filter_map(|job_id| {
923 recovery_context
924 .fragment_context
925 .job_fragments
926 .get(job_id)
927 .map(|job| (*job_id, job))
928 }),
929 version,
930 table_change_log,
931 )
932 })
933 .await?;
934
935 let mv_depended_subscriptions = self
936 .metadata_manager
937 .get_mv_depended_subscriptions(Some(database_id))
938 .await?;
939
940 let cdc_table_snapshot_splits =
941 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
942 .await?;
943
944 self.refresh_manager
945 .remove_trackers_by_database(database_id);
946
947 Ok(DatabaseRuntimeInfoSnapshot {
948 recovery_context,
949 state_table_committed_epochs,
950 state_table_log_epochs,
951 mv_depended_subscriptions,
952 background_jobs,
953 cdc_table_snapshot_splits,
954 })
955 }
956}
957
958#[cfg(test)]
959mod tests {
960 use std::collections::HashMap;
961
962 use risingwave_common::catalog::FragmentTypeMask;
963 use risingwave_common::id::WorkerId;
964 use risingwave_meta_model::DispatcherType;
965 use risingwave_meta_model::fragment::DistributionType;
966 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
967 use risingwave_pb::stream_plan::{
968 PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
969 };
970
971 use super::*;
972 use crate::controller::fragment::InflightActorInfo;
973 use crate::model::DownstreamFragmentRelation;
974 use crate::stream::UpstreamSinkInfo;
975
976 #[test]
977 fn test_recovery_table_with_upstream_sinks_updates_union_node() {
978 let database_id = DatabaseId::new(1);
979 let job_id = JobId::new(10);
980 let fragment_id = FragmentId::new(100);
981 let sink_fragment_id = FragmentId::new(200);
982
983 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
984 let fragment = InflightFragmentInfo {
985 fragment_id,
986 distribution_type: DistributionType::Hash,
987 fragment_type_mask: FragmentTypeMask::empty(),
988 vnode_count: 1,
989 nodes: PbStreamNode {
990 node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
991 PbUpstreamSinkUnionNode {
992 init_upstreams: vec![],
993 },
994 ))),
995 ..Default::default()
996 },
997 actors: HashMap::new(),
998 state_table_ids: HashSet::new(),
999 };
1000
1001 inflight_jobs
1002 .entry(database_id)
1003 .or_default()
1004 .entry(job_id)
1005 .or_default()
1006 .insert(fragment_id, fragment);
1007
1008 let upstream_sink_recovery = HashMap::from([(
1009 job_id,
1010 UpstreamSinkRecoveryInfo {
1011 target_fragment_id: fragment_id,
1012 upstream_infos: vec![UpstreamSinkInfo {
1013 sink_id: SinkId::new(1),
1014 sink_fragment_id,
1015 sink_output_fields: vec![],
1016 sink_original_target_columns: vec![],
1017 project_exprs: vec![],
1018 new_sink_downstream: DownstreamFragmentRelation {
1019 downstream_fragment_id: FragmentId::new(300),
1020 dispatcher_type: DispatcherType::Hash,
1021 dist_key_indices: vec![],
1022 output_mapping: PbDispatchOutputMapping::default(),
1023 },
1024 }],
1025 },
1026 )]);
1027
1028 recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1029
1030 let updated = inflight_jobs
1031 .get(&database_id)
1032 .unwrap()
1033 .get(&job_id)
1034 .unwrap()
1035 .get(&fragment_id)
1036 .unwrap();
1037
1038 let PbNodeBody::UpstreamSinkUnion(updated_union) =
1039 updated.nodes.node_body.as_ref().unwrap()
1040 else {
1041 panic!("expected upstream sink union node");
1042 };
1043
1044 assert_eq!(updated_union.init_upstreams.len(), 1);
1045 assert_eq!(
1046 updated_union.init_upstreams[0].upstream_fragment_id,
1047 sink_fragment_id.as_raw_id()
1048 );
1049 }
1050
1051 #[test]
1052 fn test_build_stream_actors_uses_preloaded_extra_info() {
1053 let database_id = DatabaseId::new(2);
1054 let job_id = JobId::new(20);
1055 let fragment_id = FragmentId::new(120);
1056 let actor_id = ActorId::new(500);
1057
1058 let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1059 inflight_jobs
1060 .entry(database_id)
1061 .or_default()
1062 .entry(job_id)
1063 .or_default()
1064 .insert(
1065 fragment_id,
1066 InflightFragmentInfo {
1067 fragment_id,
1068 distribution_type: DistributionType::Hash,
1069 fragment_type_mask: FragmentTypeMask::empty(),
1070 vnode_count: 1,
1071 nodes: PbStreamNode::default(),
1072 actors: HashMap::from([(
1073 actor_id,
1074 InflightActorInfo {
1075 worker_id: WorkerId::new(1),
1076 vnode_bitmap: None,
1077 splits: vec![],
1078 },
1079 )]),
1080 state_table_ids: HashSet::new(),
1081 },
1082 );
1083
1084 let job_extra_info = HashMap::from([(
1085 job_id,
1086 StreamingJobExtraInfo {
1087 timezone: Some("UTC".to_owned()),
1088 config_override: "cfg".into(),
1089 job_definition: "definition".to_owned(),
1090 backfill_orders: None,
1091 refresh_interval_sec: None,
1092 },
1093 )]);
1094
1095 let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1096
1097 let actor = stream_actors.get(&actor_id).unwrap();
1098 assert_eq!(actor.actor_id, actor_id);
1099 assert_eq!(actor.fragment_id, fragment_id);
1100 assert_eq!(actor.mview_definition, "definition");
1101 assert_eq!(&*actor.config_override, "cfg");
1102 let expr_ctx = actor.expr_context.as_ref().unwrap();
1103 assert_eq!(expr_ctx.time_zone, "UTC");
1104 }
1105}