risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41    FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::controller::utils::StreamingJobExtraInfo;
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
46use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
47use crate::stream::cdc::reload_cdc_table_snapshot_splits;
48use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
49
50struct UpstreamSinkRecoveryInfo {
51    target_fragment_id: FragmentId,
52    upstream_infos: Vec<UpstreamSinkInfo>,
53}
54
55struct LoadedRecoveryContext {
56    fragment_context: LoadedFragmentContext,
57    job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
58    upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
59    fragment_relations: FragmentDownstreamRelation,
60}
61
62impl LoadedRecoveryContext {
63    fn empty(fragment_context: LoadedFragmentContext) -> Self {
64        Self {
65            fragment_context,
66            job_extra_info: HashMap::new(),
67            upstream_sink_recovery: HashMap::new(),
68            fragment_relations: FragmentDownstreamRelation::default(),
69        }
70    }
71}
72
73/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
74/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
75/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
76/// the operator. All necessary metadata must be preloaded before rendering.
77fn recovery_table_with_upstream_sinks(
78    inflight_jobs: &mut FragmentRenderMap,
79    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
80) -> MetaResult<()> {
81    if upstream_sink_recovery.is_empty() {
82        return Ok(());
83    }
84
85    let mut seen_jobs = HashSet::new();
86
87    for jobs in inflight_jobs.values_mut() {
88        for (job_id, fragments) in jobs {
89            if !seen_jobs.insert(*job_id) {
90                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
91            }
92
93            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
94                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
95                    refill_upstream_sink_union_in_table(
96                        &mut target_fragment.nodes,
97                        &recovery.upstream_infos,
98                    );
99                } else {
100                    return Err(anyhow::anyhow!(
101                        "target fragment {} not found for upstream sink recovery of job {}",
102                        recovery.target_fragment_id,
103                        job_id
104                    )
105                    .into());
106                }
107            }
108        }
109    }
110
111    Ok(())
112}
113
114/// Assembles `StreamActor` instances from rendered fragment info and job context.
115///
116/// This function combines the actor assignments from `FragmentRenderMap` with
117/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
118/// to produce the final `StreamActor` structures needed for recovery.
119fn build_stream_actors(
120    all_info: &FragmentRenderMap,
121    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
122) -> MetaResult<HashMap<ActorId, StreamActor>> {
123    let mut stream_actors = HashMap::new();
124
125    for (job_id, streaming_info) in all_info.values().flatten() {
126        let extra_info = job_extra_info
127            .get(job_id)
128            .cloned()
129            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
130        let expr_context = extra_info.stream_context().to_expr_context();
131        let job_definition = extra_info.job_definition;
132        let config_override = extra_info.config_override;
133
134        for (fragment_id, fragment_infos) in streaming_info {
135            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
136                stream_actors.insert(
137                    *actor_id,
138                    StreamActor {
139                        actor_id: *actor_id,
140                        fragment_id: *fragment_id,
141                        vnode_bitmap: vnode_bitmap.clone(),
142                        mview_definition: job_definition.clone(),
143                        expr_context: Some(expr_context.clone()),
144                        config_override: config_override.clone(),
145                    },
146                );
147            }
148        }
149    }
150    Ok(stream_actors)
151}
152
153impl GlobalBarrierWorkerContextImpl {
154    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
155    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
156        self.metadata_manager
157            .catalog_controller
158            .clean_dirty_subscription(database_id)
159            .await?;
160        let dirty_associated_source_ids = self
161            .metadata_manager
162            .catalog_controller
163            .clean_dirty_creating_jobs(database_id)
164            .await?;
165        self.metadata_manager
166            .reset_all_refresh_jobs_to_idle()
167            .await?;
168
169        // unregister cleaned sources.
170        self.source_manager
171            .apply_source_change(SourceChange::DropSource {
172                dropped_source_ids: dirty_associated_source_ids,
173            })
174            .await;
175
176        Ok(())
177    }
178
179    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
180        if let Some(database_id) = database_id {
181            let sink_ids = self
182                .metadata_manager
183                .catalog_controller
184                .list_sink_ids(Some(database_id))
185                .await?;
186            self.sink_manager.stop_sink_coordinator(sink_ids).await;
187        } else {
188            self.sink_manager.reset().await;
189        }
190        Ok(())
191    }
192
193    async fn abort_dirty_pending_sink_state(
194        &self,
195        database_id: Option<DatabaseId>,
196    ) -> MetaResult<()> {
197        let pending_sinks: HashSet<SinkId> = self
198            .metadata_manager
199            .catalog_controller
200            .list_all_pending_sinks(database_id)
201            .await?;
202
203        if pending_sinks.is_empty() {
204            return Ok(());
205        }
206
207        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
208            .metadata_manager
209            .catalog_controller
210            .fetch_sink_with_state_table_ids(pending_sinks)
211            .await?;
212
213        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
214
215        for (sink_id, table_ids) in sink_with_state_tables {
216            let Some(table_id) = table_ids.first() else {
217                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
218            };
219
220            self.hummock_manager
221                .on_current_version(|version| -> MetaResult<()> {
222                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
223                        assert!(
224                            sink_committed_epoch
225                                .insert(sink_id, committed_epoch)
226                                .is_none()
227                        );
228                        Ok(())
229                    } else {
230                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
231                    }
232                })
233                .await?;
234        }
235
236        self.metadata_manager
237            .catalog_controller
238            .abort_pending_sink_epochs(sink_committed_epoch)
239            .await?;
240
241        Ok(())
242    }
243
244    async fn purge_state_table_from_hummock(
245        &self,
246        all_state_table_ids: &HashSet<TableId>,
247    ) -> MetaResult<()> {
248        self.hummock_manager.purge(all_state_table_ids).await?;
249        Ok(())
250    }
251
252    async fn list_background_job_progress(
253        &self,
254        database_id: Option<DatabaseId>,
255    ) -> MetaResult<HashMap<JobId, String>> {
256        let mgr = &self.metadata_manager;
257        let job_info = mgr
258            .catalog_controller
259            .list_background_creating_jobs(false, database_id)
260            .await?;
261
262        Ok(job_info
263            .into_iter()
264            .map(|(job_id, definition, _init_at)| (job_id, definition))
265            .collect())
266    }
267
268    /// Sync render stage: uses loaded context and current workers to produce actor assignments.
269    fn render_actor_assignments(
270        &self,
271        database_id: Option<DatabaseId>,
272        loaded: &LoadedFragmentContext,
273        worker_nodes: &ActiveStreamingWorkerNodes,
274        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
275    ) -> MetaResult<FragmentRenderMap> {
276        if loaded.is_empty() {
277            return Ok(HashMap::new());
278        }
279
280        let available_workers: BTreeMap<_, _> = worker_nodes
281            .current()
282            .values()
283            .filter(|worker| worker.is_streaming_schedulable())
284            .map(|worker| {
285                (
286                    worker.id,
287                    WorkerInfo {
288                        parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
289                        resource_group: worker.resource_group(),
290                    },
291                )
292            })
293            .collect();
294
295        let RenderedGraph { fragments, .. } = render_actor_assignments(
296            self.metadata_manager
297                .catalog_controller
298                .env
299                .actor_id_generator(),
300            &available_workers,
301            adaptive_parallelism_strategy,
302            loaded,
303        )?;
304
305        if let Some(database_id) = database_id {
306            for loaded_database_id in fragments.keys() {
307                assert_eq!(*loaded_database_id, database_id);
308            }
309        }
310
311        Ok(fragments)
312    }
313
314    async fn load_recovery_context(
315        &self,
316        database_id: Option<DatabaseId>,
317    ) -> MetaResult<LoadedRecoveryContext> {
318        let inner = self
319            .metadata_manager
320            .catalog_controller
321            .get_inner_read_guard()
322            .await;
323        let txn = inner.db.begin().await?;
324
325        let fragment_context = self
326            .metadata_manager
327            .catalog_controller
328            .load_fragment_context_in_txn(&txn, database_id)
329            .await
330            .inspect_err(|err| {
331                warn!(error = %err.as_report(), "load fragment context failed");
332            })?;
333
334        if fragment_context.is_empty() {
335            return Ok(LoadedRecoveryContext::empty(fragment_context));
336        }
337
338        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
339        let job_extra_info = self
340            .metadata_manager
341            .catalog_controller
342            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
343            .await?;
344
345        let mut upstream_targets = HashMap::new();
346        for fragment in fragment_context.fragment_map.values() {
347            let mut has_upstream_union = false;
348            visit_stream_node_cont(&fragment.stream_node.to_protobuf(), |node| {
349                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
350                    has_upstream_union = true;
351                    false
352                } else {
353                    true
354                }
355            });
356
357            if has_upstream_union
358                && let Some(previous) =
359                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
360            {
361                bail!(
362                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
363                    fragment.job_id,
364                    fragment.fragment_id,
365                    previous
366                );
367            }
368        }
369
370        let mut upstream_sink_recovery = HashMap::new();
371        if !upstream_targets.is_empty() {
372            let tables = self
373                .metadata_manager
374                .catalog_controller
375                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
376                .await?;
377
378            for table in tables {
379                let job_id = table.id.as_job_id();
380                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
381                    // This should not happen unless catalog changes or legacy metadata are involved.
382                    tracing::debug!(
383                        job_id = %job_id,
384                        "upstream sink union target fragment not found for table"
385                    );
386                    continue;
387                };
388
389                let upstream_infos = self
390                    .metadata_manager
391                    .catalog_controller
392                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
393                    .await?;
394
395                upstream_sink_recovery.insert(
396                    job_id,
397                    UpstreamSinkRecoveryInfo {
398                        target_fragment_id: *target_fragment_id,
399                        upstream_infos,
400                    },
401                );
402            }
403        }
404
405        let fragment_relations = self
406            .metadata_manager
407            .catalog_controller
408            .get_fragment_downstream_relations_in_txn(
409                &txn,
410                fragment_context.fragment_map.keys().copied().collect_vec(),
411            )
412            .await?;
413
414        Ok(LoadedRecoveryContext {
415            fragment_context,
416            job_extra_info,
417            upstream_sink_recovery,
418            fragment_relations,
419        })
420    }
421
422    #[expect(clippy::type_complexity)]
423    fn resolve_hummock_version_epochs(
424        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
425        version: &HummockVersion,
426    ) -> MetaResult<(
427        HashMap<TableId, u64>,
428        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
429    )> {
430        let table_committed_epoch: HashMap<_, _> = version
431            .state_table_info
432            .info()
433            .iter()
434            .map(|(table_id, info)| (*table_id, info.committed_epoch))
435            .collect();
436        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
437            Ok(*table_committed_epoch
438                .get(&table_id)
439                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
440        };
441        let mut min_downstream_committed_epochs = HashMap::new();
442        for (job_id, fragments) in background_jobs {
443            let job_committed_epoch = {
444                let mut table_id_iter =
445                    InflightFragmentInfo::existing_table_ids(fragments.values());
446                let Some(first_table_id) = table_id_iter.next() else {
447                    bail!("job {} has no state table", job_id);
448                };
449                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
450                for table_id in table_id_iter {
451                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
452                    if job_committed_epoch != table_committed_epoch {
453                        bail!(
454                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
455                            first_table_id,
456                            job_committed_epoch,
457                            table_id,
458                            table_committed_epoch,
459                            job_id
460                        );
461                    }
462                }
463
464                job_committed_epoch
465            };
466            if let (Some(snapshot_backfill_info), _) =
467                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
468                    fragments
469                        .values()
470                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
471                )?
472            {
473                for (upstream_table, snapshot_epoch) in
474                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
475                {
476                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
477                        anyhow!(
478                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
479                            job_id, upstream_table
480                        )
481                    })?;
482                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
483                    match min_downstream_committed_epochs.entry(upstream_table) {
484                        Entry::Occupied(entry) => {
485                            let prev_min_epoch = entry.into_mut();
486                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
487                        }
488                        Entry::Vacant(entry) => {
489                            entry.insert(pinned_epoch);
490                        }
491                    }
492                }
493            }
494        }
495        let mut log_epochs = HashMap::new();
496        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
497            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
498            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
499                Ordering::Less => {
500                    bail!(
501                        "downstream epoch {} later than upstream epoch {} of table {}",
502                        downstream_committed_epoch,
503                        upstream_committed_epoch,
504                        upstream_table_id
505                    );
506                }
507                Ordering::Equal => {
508                    continue;
509                }
510                Ordering::Greater => {
511                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
512                    {
513                        let epochs = table_change_log
514                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
515                            .map(|epoch_log| {
516                                (
517                                    epoch_log.non_checkpoint_epochs.clone(),
518                                    epoch_log.checkpoint_epoch,
519                                )
520                            })
521                            .collect_vec();
522                        let first_epochs = epochs.first();
523                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
524                            && *first_checkpoint_epoch == downstream_committed_epoch
525                        {
526                        } else {
527                            bail!(
528                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
529                                epochs,
530                                upstream_table_id,
531                                downstream_committed_epoch
532                            );
533                        }
534                        log_epochs
535                            .try_insert(upstream_table_id, epochs)
536                            .expect("non-duplicated");
537                    } else {
538                        bail!(
539                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
540                            upstream_table_id,
541                            upstream_committed_epoch,
542                            downstream_committed_epoch
543                        );
544                    }
545                }
546            }
547        }
548        Ok((table_committed_epoch, log_epochs))
549    }
550
551    pub(super) async fn reload_runtime_info_impl(
552        &self,
553    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
554        {
555            {
556                {
557                    self.clean_dirty_streaming_jobs(None)
558                        .await
559                        .context("clean dirty streaming jobs")?;
560
561                    self.reset_sink_coordinator(None)
562                        .await
563                        .context("reset sink coordinator")?;
564                    self.abort_dirty_pending_sink_state(None)
565                        .await
566                        .context("abort dirty pending sink state")?;
567
568                    // Background job progress needs to be recovered.
569                    tracing::info!("recovering background job progress");
570                    let initial_background_jobs = self
571                        .list_background_job_progress(None)
572                        .await
573                        .context("recover background job progress should not fail")?;
574
575                    tracing::info!("recovered background job progress");
576
577                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
578                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
579                    self.metadata_manager
580                        .catalog_controller
581                        .cleanup_dropped_tables()
582                        .await;
583
584                    let adaptive_parallelism_strategy = {
585                        let system_params_reader = self
586                            .metadata_manager
587                            .catalog_controller
588                            .env
589                            .system_params_reader()
590                            .await;
591                        system_params_reader.adaptive_parallelism_strategy()
592                    };
593
594                    let active_streaming_nodes =
595                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
596                            .await?;
597
598                    let background_streaming_jobs =
599                        initial_background_jobs.keys().cloned().collect_vec();
600
601                    tracing::info!(
602                        "background streaming jobs: {:?} total {}",
603                        background_streaming_jobs,
604                        background_streaming_jobs.len()
605                    );
606
607                    let unreschedulable_jobs = {
608                        let mut unreschedulable_jobs = HashSet::new();
609
610                        for job_id in background_streaming_jobs {
611                            let scan_types = self
612                                .metadata_manager
613                                .get_job_backfill_scan_types(job_id)
614                                .await?;
615
616                            if scan_types
617                                .values()
618                                .any(|scan_type| !scan_type.is_reschedulable())
619                            {
620                                unreschedulable_jobs.insert(job_id);
621                            }
622                        }
623
624                        unreschedulable_jobs
625                    };
626
627                    if !unreschedulable_jobs.is_empty() {
628                        info!(
629                            "unreschedulable background jobs: {:?}",
630                            unreschedulable_jobs
631                        );
632                    }
633
634                    // Resolve actor info for recovery. If there's no actor to recover, most of the
635                    // following steps will be no-op, while the compute nodes will still be reset.
636                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
637                    if !unreschedulable_jobs.is_empty() {
638                        bail!(
639                            "Recovery for unreschedulable background jobs is not yet implemented. \
640                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
641                            unreschedulable_jobs
642                        );
643                    }
644
645                    info!("trigger offline re-rendering");
646                    let mut recovery_context = self.load_recovery_context(None).await?;
647
648                    let mut info = self
649                        .render_actor_assignments(
650                            None,
651                            &recovery_context.fragment_context,
652                            &active_streaming_nodes,
653                            adaptive_parallelism_strategy,
654                        )
655                        .inspect_err(|err| {
656                            warn!(error = %err.as_report(), "render actor assignments failed");
657                        })?;
658
659                    info!("offline re-rendering completed");
660
661                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
662                    if !dropped_table_ids.is_empty() {
663                        self.metadata_manager
664                            .catalog_controller
665                            .complete_dropped_tables(dropped_table_ids)
666                            .await;
667                        recovery_context = self.load_recovery_context(None).await?;
668                        info = self
669                            .render_actor_assignments(
670                                None,
671                                &recovery_context.fragment_context,
672                                &active_streaming_nodes,
673                                adaptive_parallelism_strategy,
674                            )
675                            .inspect_err(|err| {
676                                warn!(error = %err.as_report(), "render actor assignments failed");
677                            })?
678                    }
679
680                    recovery_table_with_upstream_sinks(
681                        &mut info,
682                        &recovery_context.upstream_sink_recovery,
683                    )?;
684
685                    let info = info;
686
687                    self.purge_state_table_from_hummock(
688                        &info
689                            .values()
690                            .flatten()
691                            .flat_map(|(_, fragments)| {
692                                InflightFragmentInfo::existing_table_ids(fragments.values())
693                            })
694                            .collect(),
695                    )
696                    .await
697                    .context("purge state table from hummock")?;
698
699                    let (state_table_committed_epochs, state_table_log_epochs) = self
700                        .hummock_manager
701                        .on_current_version(|version| {
702                            Self::resolve_hummock_version_epochs(
703                                info.values().flat_map(|jobs| {
704                                    jobs.iter().filter_map(|(job_id, job)| {
705                                        initial_background_jobs
706                                            .contains_key(job_id)
707                                            .then_some((*job_id, job))
708                                    })
709                                }),
710                                version,
711                            )
712                        })
713                        .await?;
714
715                    let mv_depended_subscriptions = self
716                        .metadata_manager
717                        .get_mv_depended_subscriptions(None)
718                        .await?;
719
720                    let stream_actors =
721                        build_stream_actors(&info, &recovery_context.job_extra_info)?;
722
723                    let fragment_relations = recovery_context.fragment_relations;
724
725                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
726                    let background_jobs = {
727                        let mut refreshed_background_jobs = self
728                            .list_background_job_progress(None)
729                            .await
730                            .context("recover background job progress should not fail")?;
731                        info.values()
732                            .flatten()
733                            .filter_map(|(job_id, _)| {
734                                refreshed_background_jobs
735                                    .remove(job_id)
736                                    .map(|definition| (*job_id, definition))
737                            })
738                            .collect()
739                    };
740
741                    let database_infos = self
742                        .metadata_manager
743                        .catalog_controller
744                        .list_databases()
745                        .await?;
746
747                    // get split assignments for all actors
748                    let mut source_splits = HashMap::new();
749                    for (_, fragment_infos) in info.values().flatten() {
750                        for fragment in fragment_infos.values() {
751                            for (actor_id, info) in &fragment.actors {
752                                source_splits.insert(*actor_id, info.splits.clone());
753                            }
754                        }
755                    }
756
757                    let cdc_table_snapshot_splits =
758                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
759                            .await?;
760
761                    Ok(BarrierWorkerRuntimeInfoSnapshot {
762                        active_streaming_nodes,
763                        database_job_infos: info,
764                        state_table_committed_epochs,
765                        state_table_log_epochs,
766                        mv_depended_subscriptions,
767                        stream_actors,
768                        fragment_relations,
769                        source_splits,
770                        background_jobs,
771                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
772                        database_infos,
773                        cdc_table_snapshot_splits,
774                    })
775                }
776            }
777        }
778    }
779
780    pub(super) async fn reload_database_runtime_info_impl(
781        &self,
782        database_id: DatabaseId,
783    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
784        self.clean_dirty_streaming_jobs(Some(database_id))
785            .await
786            .context("clean dirty streaming jobs")?;
787
788        self.reset_sink_coordinator(Some(database_id))
789            .await
790            .context("reset sink coordinator")?;
791        self.abort_dirty_pending_sink_state(Some(database_id))
792            .await
793            .context("abort dirty pending sink state")?;
794
795        // Background job progress needs to be recovered.
796        tracing::info!(
797            ?database_id,
798            "recovering background job progress of database"
799        );
800
801        let background_jobs = self
802            .list_background_job_progress(Some(database_id))
803            .await
804            .context("recover background job progress of database should not fail")?;
805        tracing::info!(?database_id, "recovered background job progress");
806
807        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
808        let dropped_table_ids = self
809            .scheduled_barriers
810            .pre_apply_drop_cancel(Some(database_id));
811        self.metadata_manager
812            .catalog_controller
813            .complete_dropped_tables(dropped_table_ids)
814            .await;
815
816        let adaptive_parallelism_strategy = {
817            let system_params_reader = self
818                .metadata_manager
819                .catalog_controller
820                .env
821                .system_params_reader()
822                .await;
823            system_params_reader.adaptive_parallelism_strategy()
824        };
825
826        let active_streaming_nodes =
827            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
828
829        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
830
831        let mut all_info = self
832            .render_actor_assignments(
833                Some(database_id),
834                &recovery_context.fragment_context,
835                &active_streaming_nodes,
836                adaptive_parallelism_strategy,
837            )
838            .inspect_err(|err| {
839                warn!(error = %err.as_report(), "render actor assignments failed");
840            })?;
841
842        let mut database_info = all_info
843            .remove(&database_id)
844            .map_or_else(HashMap::new, |table_map| {
845                HashMap::from([(database_id, table_map)])
846            });
847
848        recovery_table_with_upstream_sinks(
849            &mut database_info,
850            &recovery_context.upstream_sink_recovery,
851        )?;
852
853        assert!(database_info.len() <= 1);
854
855        let stream_actors = build_stream_actors(&database_info, &recovery_context.job_extra_info)?;
856
857        let Some(info) = database_info
858            .into_iter()
859            .next()
860            .map(|(loaded_database_id, info)| {
861                assert_eq!(loaded_database_id, database_id);
862                info
863            })
864        else {
865            return Ok(None);
866        };
867
868        let missing_background_jobs = background_jobs
869            .keys()
870            .filter(|job_id| !info.contains_key(job_id))
871            .copied()
872            .collect_vec();
873        if !missing_background_jobs.is_empty() {
874            warn!(
875                database_id = %database_id,
876                missing_job_ids = ?missing_background_jobs,
877                "background jobs missing in rendered info"
878            );
879        }
880
881        let (state_table_committed_epochs, state_table_log_epochs) = self
882            .hummock_manager
883            .on_current_version(|version| {
884                Self::resolve_hummock_version_epochs(
885                    background_jobs
886                        .keys()
887                        .filter_map(|job_id| info.get(job_id).map(|job| (*job_id, job))),
888                    version,
889                )
890            })
891            .await?;
892
893        let mv_depended_subscriptions = self
894            .metadata_manager
895            .get_mv_depended_subscriptions(Some(database_id))
896            .await?;
897
898        let fragment_relations = recovery_context.fragment_relations;
899
900        // get split assignments for all actors
901        let mut source_splits = HashMap::new();
902        for (_, fragment) in info.values().flatten() {
903            for (actor_id, info) in &fragment.actors {
904                source_splits.insert(*actor_id, info.splits.clone());
905            }
906        }
907
908        let cdc_table_snapshot_splits =
909            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
910                .await?;
911
912        self.refresh_manager
913            .remove_trackers_by_database(database_id);
914
915        Ok(Some(DatabaseRuntimeInfoSnapshot {
916            job_infos: info,
917            state_table_committed_epochs,
918            state_table_log_epochs,
919            mv_depended_subscriptions,
920            stream_actors,
921            fragment_relations,
922            source_splits,
923            background_jobs,
924            cdc_table_snapshot_splits,
925        }))
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    use std::collections::HashMap;
932
933    use risingwave_common::catalog::FragmentTypeMask;
934    use risingwave_common::id::WorkerId;
935    use risingwave_meta_model::DispatcherType;
936    use risingwave_meta_model::fragment::DistributionType;
937    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
938    use risingwave_pb::stream_plan::{
939        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
940    };
941
942    use super::*;
943    use crate::controller::fragment::InflightActorInfo;
944    use crate::model::DownstreamFragmentRelation;
945    use crate::stream::UpstreamSinkInfo;
946
947    #[test]
948    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
949        let database_id = DatabaseId::new(1);
950        let job_id = JobId::new(10);
951        let fragment_id = FragmentId::new(100);
952        let sink_fragment_id = FragmentId::new(200);
953
954        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
955        let fragment = InflightFragmentInfo {
956            fragment_id,
957            distribution_type: DistributionType::Hash,
958            fragment_type_mask: FragmentTypeMask::empty(),
959            vnode_count: 1,
960            nodes: PbStreamNode {
961                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
962                    PbUpstreamSinkUnionNode {
963                        init_upstreams: vec![],
964                    },
965                ))),
966                ..Default::default()
967            },
968            actors: HashMap::new(),
969            state_table_ids: HashSet::new(),
970        };
971
972        inflight_jobs
973            .entry(database_id)
974            .or_default()
975            .entry(job_id)
976            .or_default()
977            .insert(fragment_id, fragment);
978
979        let upstream_sink_recovery = HashMap::from([(
980            job_id,
981            UpstreamSinkRecoveryInfo {
982                target_fragment_id: fragment_id,
983                upstream_infos: vec![UpstreamSinkInfo {
984                    sink_id: SinkId::new(1),
985                    sink_fragment_id,
986                    sink_output_fields: vec![],
987                    sink_original_target_columns: vec![],
988                    project_exprs: vec![],
989                    new_sink_downstream: DownstreamFragmentRelation {
990                        downstream_fragment_id: FragmentId::new(300),
991                        dispatcher_type: DispatcherType::Hash,
992                        dist_key_indices: vec![],
993                        output_mapping: PbDispatchOutputMapping::default(),
994                    },
995                }],
996            },
997        )]);
998
999        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1000
1001        let updated = inflight_jobs
1002            .get(&database_id)
1003            .unwrap()
1004            .get(&job_id)
1005            .unwrap()
1006            .get(&fragment_id)
1007            .unwrap();
1008
1009        let PbNodeBody::UpstreamSinkUnion(updated_union) =
1010            updated.nodes.node_body.as_ref().unwrap()
1011        else {
1012            panic!("expected upstream sink union node");
1013        };
1014
1015        assert_eq!(updated_union.init_upstreams.len(), 1);
1016        assert_eq!(
1017            updated_union.init_upstreams[0].upstream_fragment_id,
1018            sink_fragment_id.as_raw_id()
1019        );
1020    }
1021
1022    #[test]
1023    fn test_build_stream_actors_uses_preloaded_extra_info() {
1024        let database_id = DatabaseId::new(2);
1025        let job_id = JobId::new(20);
1026        let fragment_id = FragmentId::new(120);
1027        let actor_id = ActorId::new(500);
1028
1029        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1030        inflight_jobs
1031            .entry(database_id)
1032            .or_default()
1033            .entry(job_id)
1034            .or_default()
1035            .insert(
1036                fragment_id,
1037                InflightFragmentInfo {
1038                    fragment_id,
1039                    distribution_type: DistributionType::Hash,
1040                    fragment_type_mask: FragmentTypeMask::empty(),
1041                    vnode_count: 1,
1042                    nodes: PbStreamNode::default(),
1043                    actors: HashMap::from([(
1044                        actor_id,
1045                        InflightActorInfo {
1046                            worker_id: WorkerId::new(1),
1047                            vnode_bitmap: None,
1048                            splits: vec![],
1049                        },
1050                    )]),
1051                    state_table_ids: HashSet::new(),
1052                },
1053            );
1054
1055        let job_extra_info = HashMap::from([(
1056            job_id,
1057            StreamingJobExtraInfo {
1058                timezone: Some("UTC".to_owned()),
1059                config_override: "cfg".into(),
1060                job_definition: "definition".to_owned(),
1061            },
1062        )]);
1063
1064        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1065
1066        let actor = stream_actors.get(&actor_id).unwrap();
1067        assert_eq!(actor.actor_id, actor_id);
1068        assert_eq!(actor.fragment_id, fragment_id);
1069        assert_eq!(actor.mview_definition, "definition");
1070        assert_eq!(&*actor.config_override, "cfg");
1071        let expr_ctx = actor.expr_context.as_ref().unwrap();
1072        assert_eq!(expr_ctx.time_zone, "UTC");
1073    }
1074}