risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::SplitImpl;
28use risingwave_hummock_sdk::change_log::TableChangeLogs;
29use risingwave_hummock_sdk::version::HummockVersion;
30use risingwave_meta_model::SinkId;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use sea_orm::TransactionTrait;
33use thiserror_ext::AsReport;
34use tracing::{info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::MetaResult;
38use crate::barrier::DatabaseRuntimeInfoSnapshot;
39use crate::barrier::context::GlobalBarrierWorkerContextImpl;
40use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
41use crate::controller::scale::{
42    FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
43    render_actor_assignments,
44};
45use crate::controller::utils::StreamingJobExtraInfo;
46use crate::manager::ActiveStreamingWorkerNodes;
47use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
48use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
49use crate::stream::cdc::reload_cdc_table_snapshot_splits;
50use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
51
52#[derive(Debug)]
53pub(crate) struct UpstreamSinkRecoveryInfo {
54    target_fragment_id: FragmentId,
55    upstream_infos: Vec<UpstreamSinkInfo>,
56}
57
58#[derive(Debug)]
59pub struct LoadedRecoveryContext {
60    pub fragment_context: LoadedFragmentContext,
61    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
62    pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
63    pub fragment_relations: FragmentDownstreamRelation,
64}
65
66impl LoadedRecoveryContext {
67    fn empty(fragment_context: LoadedFragmentContext) -> Self {
68        Self {
69            fragment_context,
70            job_extra_info: HashMap::new(),
71            upstream_sink_recovery: HashMap::new(),
72            fragment_relations: FragmentDownstreamRelation::default(),
73        }
74    }
75}
76
77pub struct RenderedDatabaseRuntimeInfo {
78    pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
79    pub stream_actors: HashMap<ActorId, StreamActor>,
80    pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
81}
82
83pub fn render_runtime_info(
84    actor_id_generator: &AtomicU32,
85    worker_nodes: &ActiveStreamingWorkerNodes,
86    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
87    recovery_context: &LoadedRecoveryContext,
88    database_id: DatabaseId,
89) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
90    let Some(per_database_context) = recovery_context.fragment_context.for_database(database_id)
91    else {
92        return Ok(None);
93    };
94
95    assert!(!per_database_context.is_empty());
96
97    let RenderedGraph { mut fragments, .. } = render_actor_assignments(
98        actor_id_generator,
99        worker_nodes.current(),
100        adaptive_parallelism_strategy,
101        &per_database_context,
102    )?;
103
104    let single_database = match fragments.remove(&database_id) {
105        Some(info) => info,
106        None => return Ok(None),
107    };
108
109    let mut database_map = HashMap::from([(database_id, single_database)]);
110    recovery_table_with_upstream_sinks(
111        &mut database_map,
112        &recovery_context.upstream_sink_recovery,
113    )?;
114    let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
115
116    let job_infos = database_map
117        .remove(&database_id)
118        .expect("database entry must exist");
119
120    let mut source_splits = HashMap::new();
121    for fragment_infos in job_infos.values() {
122        for fragment in fragment_infos.values() {
123            for (actor_id, info) in &fragment.actors {
124                source_splits.insert(*actor_id, info.splits.clone());
125            }
126        }
127    }
128
129    Ok(Some(RenderedDatabaseRuntimeInfo {
130        job_infos,
131        stream_actors,
132        source_splits,
133    }))
134}
135
136/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
137/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
138/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
139/// the operator. All necessary metadata must be preloaded before rendering.
140fn recovery_table_with_upstream_sinks(
141    inflight_jobs: &mut FragmentRenderMap,
142    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
143) -> MetaResult<()> {
144    if upstream_sink_recovery.is_empty() {
145        return Ok(());
146    }
147
148    let mut seen_jobs = HashSet::new();
149
150    for jobs in inflight_jobs.values_mut() {
151        for (job_id, fragments) in jobs {
152            if !seen_jobs.insert(*job_id) {
153                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
154            }
155
156            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
157                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
158                    refill_upstream_sink_union_in_table(
159                        &mut target_fragment.nodes,
160                        &recovery.upstream_infos,
161                    );
162                } else {
163                    return Err(anyhow::anyhow!(
164                        "target fragment {} not found for upstream sink recovery of job {}",
165                        recovery.target_fragment_id,
166                        job_id
167                    )
168                    .into());
169                }
170            }
171        }
172    }
173
174    Ok(())
175}
176
177/// Assembles `StreamActor` instances from rendered fragment info and job context.
178///
179/// This function combines the actor assignments from `FragmentRenderMap` with
180/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
181/// to produce the final `StreamActor` structures needed for recovery.
182fn build_stream_actors(
183    all_info: &FragmentRenderMap,
184    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
185) -> MetaResult<HashMap<ActorId, StreamActor>> {
186    let mut stream_actors = HashMap::new();
187
188    for (job_id, streaming_info) in all_info.values().flatten() {
189        let extra_info = job_extra_info
190            .get(job_id)
191            .cloned()
192            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
193        let expr_context = extra_info.stream_context().to_expr_context();
194        let job_definition = extra_info.job_definition;
195        let config_override = extra_info.config_override;
196
197        for (fragment_id, fragment_infos) in streaming_info {
198            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
199                stream_actors.insert(
200                    *actor_id,
201                    StreamActor {
202                        actor_id: *actor_id,
203                        fragment_id: *fragment_id,
204                        vnode_bitmap: vnode_bitmap.clone(),
205                        mview_definition: job_definition.clone(),
206                        expr_context: Some(expr_context.clone()),
207                        config_override: config_override.clone(),
208                    },
209                );
210            }
211        }
212    }
213    Ok(stream_actors)
214}
215
216impl GlobalBarrierWorkerContextImpl {
217    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
218    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
219        self.metadata_manager
220            .catalog_controller
221            .clean_dirty_subscription(database_id)
222            .await?;
223        let dirty_associated_source_ids = self
224            .metadata_manager
225            .catalog_controller
226            .clean_dirty_creating_jobs(database_id)
227            .await?;
228        self.metadata_manager
229            .reset_all_refresh_jobs_to_idle()
230            .await?;
231
232        // unregister cleaned sources.
233        self.source_manager
234            .apply_source_change(SourceChange::DropSource {
235                dropped_source_ids: dirty_associated_source_ids,
236            })
237            .await;
238
239        Ok(())
240    }
241
242    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
243        if let Some(database_id) = database_id {
244            let sink_ids = self
245                .metadata_manager
246                .catalog_controller
247                .list_sink_ids(Some(database_id))
248                .await?;
249            self.sink_manager.stop_sink_coordinator(sink_ids).await;
250        } else {
251            self.sink_manager.reset().await;
252        }
253        Ok(())
254    }
255
256    async fn abort_dirty_pending_sink_state(
257        &self,
258        database_id: Option<DatabaseId>,
259    ) -> MetaResult<()> {
260        let pending_sinks: HashSet<SinkId> = self
261            .metadata_manager
262            .catalog_controller
263            .list_all_pending_sinks(database_id)
264            .await?;
265
266        if pending_sinks.is_empty() {
267            return Ok(());
268        }
269
270        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
271            .metadata_manager
272            .catalog_controller
273            .fetch_sink_with_state_table_ids(pending_sinks)
274            .await?;
275
276        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
277
278        for (sink_id, table_ids) in sink_with_state_tables {
279            let Some(table_id) = table_ids.first() else {
280                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
281            };
282
283            self.hummock_manager
284                .on_current_version(|version| -> MetaResult<()> {
285                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
286                        assert!(
287                            sink_committed_epoch
288                                .insert(sink_id, committed_epoch)
289                                .is_none()
290                        );
291                        Ok(())
292                    } else {
293                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
294                    }
295                })
296                .await?;
297        }
298
299        self.metadata_manager
300            .catalog_controller
301            .abort_pending_sink_epochs(sink_committed_epoch)
302            .await?;
303
304        Ok(())
305    }
306
307    async fn purge_state_table_from_hummock(
308        &self,
309        all_state_table_ids: &HashSet<TableId>,
310    ) -> MetaResult<()> {
311        self.hummock_manager.purge(all_state_table_ids).await?;
312        Ok(())
313    }
314
315    async fn list_background_job_progress(
316        &self,
317        database_id: Option<DatabaseId>,
318    ) -> MetaResult<HashSet<JobId>> {
319        let mgr = &self.metadata_manager;
320        mgr.catalog_controller
321            .list_background_creating_jobs(false, database_id)
322            .await
323    }
324
325    async fn load_recovery_context(
326        &self,
327        database_id: Option<DatabaseId>,
328    ) -> MetaResult<LoadedRecoveryContext> {
329        let inner = self
330            .metadata_manager
331            .catalog_controller
332            .get_inner_read_guard()
333            .await;
334        let txn = inner.db.begin().await?;
335
336        let fragment_context = self
337            .metadata_manager
338            .catalog_controller
339            .load_fragment_context_in_txn(&txn, database_id)
340            .await
341            .inspect_err(|err| {
342                warn!(error = %err.as_report(), "load fragment context failed");
343            })?;
344
345        if fragment_context.is_empty() {
346            return Ok(LoadedRecoveryContext::empty(fragment_context));
347        }
348
349        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
350        let job_extra_info = self
351            .metadata_manager
352            .catalog_controller
353            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
354            .await?;
355
356        let mut upstream_targets = HashMap::new();
357        for fragment in fragment_context
358            .job_fragments
359            .values()
360            .flat_map(|fragments| fragments.values())
361        {
362            let mut has_upstream_union = false;
363            visit_stream_node_cont(&fragment.nodes, |node| {
364                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
365                    has_upstream_union = true;
366                    false
367                } else {
368                    true
369                }
370            });
371
372            if has_upstream_union
373                && let Some(previous) =
374                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
375            {
376                bail!(
377                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
378                    fragment.job_id,
379                    fragment.fragment_id,
380                    previous
381                );
382            }
383        }
384
385        let mut upstream_sink_recovery = HashMap::new();
386        if !upstream_targets.is_empty() {
387            let tables = self
388                .metadata_manager
389                .catalog_controller
390                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
391                .await?;
392
393            for table in tables {
394                let job_id = table.id.as_job_id();
395                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
396                    // This should not happen unless catalog changes or legacy metadata are involved.
397                    tracing::debug!(
398                        job_id = %job_id,
399                        "upstream sink union target fragment not found for table"
400                    );
401                    continue;
402                };
403
404                let upstream_infos = self
405                    .metadata_manager
406                    .catalog_controller
407                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
408                    .await?;
409
410                upstream_sink_recovery.insert(
411                    job_id,
412                    UpstreamSinkRecoveryInfo {
413                        target_fragment_id: *target_fragment_id,
414                        upstream_infos,
415                    },
416                );
417            }
418        }
419
420        let fragment_relations = self
421            .metadata_manager
422            .catalog_controller
423            .get_fragment_downstream_relations_in_txn(
424                &txn,
425                fragment_context
426                    .job_fragments
427                    .values()
428                    .flat_map(|fragments| fragments.keys().copied())
429                    .collect_vec(),
430            )
431            .await?;
432
433        Ok(LoadedRecoveryContext {
434            fragment_context,
435            job_extra_info,
436            upstream_sink_recovery,
437            fragment_relations,
438        })
439    }
440
441    #[expect(clippy::type_complexity)]
442    fn resolve_hummock_version_epochs(
443        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
444        version: &HummockVersion,
445        table_change_log: &TableChangeLogs,
446    ) -> MetaResult<(
447        HashMap<TableId, u64>,
448        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
449    )> {
450        let table_committed_epoch: HashMap<_, _> = version
451            .state_table_info
452            .info()
453            .iter()
454            .map(|(table_id, info)| (*table_id, info.committed_epoch))
455            .collect();
456        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
457            Ok(*table_committed_epoch
458                .get(&table_id)
459                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
460        };
461        let mut min_downstream_committed_epochs = HashMap::new();
462        for (job_id, fragments) in background_jobs {
463            let job_committed_epoch = {
464                let mut table_id_iter = fragments
465                    .values()
466                    .flat_map(|fragment| fragment.state_table_ids.iter().copied());
467                let Some(first_table_id) = table_id_iter.next() else {
468                    bail!("job {} has no state table", job_id);
469                };
470                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
471                for table_id in table_id_iter {
472                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
473                    if job_committed_epoch != table_committed_epoch {
474                        bail!(
475                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
476                            first_table_id,
477                            job_committed_epoch,
478                            table_id,
479                            table_committed_epoch,
480                            job_id
481                        );
482                    }
483                }
484
485                job_committed_epoch
486            };
487            if let (Some(snapshot_backfill_info), _) =
488                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
489                    fragments
490                        .values()
491                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
492                )?
493            {
494                for (upstream_table, snapshot_epoch) in
495                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
496                {
497                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
498                        anyhow!(
499                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
500                            job_id, upstream_table
501                        )
502                    })?;
503                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
504                    match min_downstream_committed_epochs.entry(upstream_table) {
505                        Entry::Occupied(entry) => {
506                            let prev_min_epoch = entry.into_mut();
507                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
508                        }
509                        Entry::Vacant(entry) => {
510                            entry.insert(pinned_epoch);
511                        }
512                    }
513                }
514            }
515        }
516        let mut log_epochs = HashMap::new();
517        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
518            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
519            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
520                Ordering::Less => {
521                    bail!(
522                        "downstream epoch {} later than upstream epoch {} of table {}",
523                        downstream_committed_epoch,
524                        upstream_committed_epoch,
525                        upstream_table_id
526                    );
527                }
528                Ordering::Equal => {
529                    continue;
530                }
531                Ordering::Greater => {
532                    if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
533                        let epochs = table_change_log
534                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
535                            .map(|epoch_log| {
536                                (
537                                    epoch_log.non_checkpoint_epochs.clone(),
538                                    epoch_log.checkpoint_epoch,
539                                )
540                            })
541                            .collect_vec();
542                        let first_epochs = epochs.first();
543                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
544                            && *first_checkpoint_epoch == downstream_committed_epoch
545                        {
546                        } else {
547                            bail!(
548                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
549                                epochs,
550                                upstream_table_id,
551                                downstream_committed_epoch
552                            );
553                        }
554                        log_epochs
555                            .try_insert(upstream_table_id, epochs)
556                            .expect("non-duplicated");
557                    } else {
558                        bail!(
559                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
560                            upstream_table_id,
561                            upstream_committed_epoch,
562                            downstream_committed_epoch
563                        );
564                    }
565                }
566            }
567        }
568        Ok((table_committed_epoch, log_epochs))
569    }
570
571    pub(super) async fn reload_runtime_info_impl(
572        &self,
573    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
574        {
575            {
576                {
577                    self.clean_dirty_streaming_jobs(None)
578                        .await
579                        .context("clean dirty streaming jobs")?;
580
581                    self.reset_sink_coordinator(None)
582                        .await
583                        .context("reset sink coordinator")?;
584                    self.abort_dirty_pending_sink_state(None)
585                        .await
586                        .context("abort dirty pending sink state")?;
587
588                    // Background job progress needs to be recovered.
589                    tracing::info!("recovering background job progress");
590                    let initial_background_jobs = self
591                        .list_background_job_progress(None)
592                        .await
593                        .context("recover background job progress should not fail")?;
594
595                    tracing::info!("recovered background job progress");
596
597                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
598                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
599                    self.metadata_manager
600                        .catalog_controller
601                        .cleanup_dropped_tables()
602                        .await;
603
604                    let active_streaming_nodes =
605                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
606                            .await?;
607
608                    let background_streaming_jobs =
609                        initial_background_jobs.iter().cloned().collect_vec();
610
611                    tracing::info!(
612                        "background streaming jobs: {:?} total {}",
613                        background_streaming_jobs,
614                        background_streaming_jobs.len()
615                    );
616
617                    let unreschedulable_jobs = {
618                        let mut unreschedulable_jobs = HashSet::new();
619
620                        for job_id in background_streaming_jobs {
621                            let scan_types = self
622                                .metadata_manager
623                                .get_job_backfill_scan_types(job_id)
624                                .await?;
625
626                            if scan_types
627                                .values()
628                                .any(|scan_type| !scan_type.is_reschedulable(false))
629                            {
630                                unreschedulable_jobs.insert(job_id);
631                            }
632                        }
633
634                        unreschedulable_jobs
635                    };
636
637                    if !unreschedulable_jobs.is_empty() {
638                        info!(
639                            "unreschedulable background jobs: {:?}",
640                            unreschedulable_jobs
641                        );
642                    }
643
644                    // Resolve actor info for recovery. If there's no actor to recover, most of the
645                    // following steps will be no-op, while the compute nodes will still be reset.
646                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
647                    if !unreschedulable_jobs.is_empty() {
648                        bail!(
649                            "Recovery for unreschedulable background jobs is not yet implemented. \
650                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
651                            unreschedulable_jobs
652                        );
653                    }
654
655                    let mut recovery_context = self.load_recovery_context(None).await?;
656                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
657                    if !dropped_table_ids.is_empty() {
658                        self.metadata_manager
659                            .catalog_controller
660                            .complete_dropped_tables(dropped_table_ids)
661                            .await;
662                        recovery_context = self.load_recovery_context(None).await?;
663                    }
664
665                    self.purge_state_table_from_hummock(
666                        &recovery_context
667                            .fragment_context
668                            .job_fragments
669                            .values()
670                            .flat_map(|fragments| fragments.values())
671                            .flat_map(|fragment| fragment.state_table_ids.iter().copied())
672                            .collect(),
673                    )
674                    .await
675                    .context("purge state table from hummock")?;
676
677                    let (state_table_committed_epochs, state_table_log_epochs) = self
678                        .hummock_manager
679                        .on_current_version_and_table_change_log(|version, table_change_log| {
680                            Self::resolve_hummock_version_epochs(
681                                recovery_context
682                                    .fragment_context
683                                    .job_fragments
684                                    .iter()
685                                    .filter_map(|(job_id, job)| {
686                                        initial_background_jobs
687                                            .contains(job_id)
688                                            .then_some((*job_id, job))
689                                    }),
690                                version,
691                                table_change_log,
692                            )
693                        })
694                        .await?;
695
696                    let mv_depended_subscriptions = self
697                        .metadata_manager
698                        .get_mv_depended_subscriptions(None)
699                        .await?;
700
701                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
702                    let background_jobs = {
703                        let mut refreshed_background_jobs = self
704                            .list_background_job_progress(None)
705                            .await
706                            .context("recover background job progress should not fail")?;
707                        recovery_context
708                            .fragment_context
709                            .job_map
710                            .keys()
711                            .filter_map(|job_id| {
712                                refreshed_background_jobs.remove(job_id).then_some(*job_id)
713                            })
714                            .collect()
715                    };
716
717                    let database_infos = self
718                        .metadata_manager
719                        .catalog_controller
720                        .list_databases()
721                        .await?;
722
723                    let cdc_table_snapshot_splits =
724                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
725                            .await?;
726
727                    Ok(BarrierWorkerRuntimeInfoSnapshot {
728                        active_streaming_nodes,
729                        recovery_context,
730                        state_table_committed_epochs,
731                        state_table_log_epochs,
732                        mv_depended_subscriptions,
733                        background_jobs,
734                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
735                        database_infos,
736                        cdc_table_snapshot_splits,
737                    })
738                }
739            }
740        }
741    }
742
743    pub(super) async fn reload_database_runtime_info_impl(
744        &self,
745        database_id: DatabaseId,
746    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
747        self.clean_dirty_streaming_jobs(Some(database_id))
748            .await
749            .context("clean dirty streaming jobs")?;
750
751        self.reset_sink_coordinator(Some(database_id))
752            .await
753            .context("reset sink coordinator")?;
754        self.abort_dirty_pending_sink_state(Some(database_id))
755            .await
756            .context("abort dirty pending sink state")?;
757
758        // Background job progress needs to be recovered.
759        tracing::info!(
760            ?database_id,
761            "recovering background job progress of database"
762        );
763
764        let background_jobs = self
765            .list_background_job_progress(Some(database_id))
766            .await
767            .context("recover background job progress of database should not fail")?;
768        tracing::info!(?database_id, "recovered background job progress");
769
770        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
771        let dropped_table_ids = self
772            .scheduled_barriers
773            .pre_apply_drop_cancel(Some(database_id));
774        self.metadata_manager
775            .catalog_controller
776            .complete_dropped_tables(dropped_table_ids)
777            .await;
778
779        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
780
781        let missing_background_jobs = background_jobs
782            .iter()
783            .filter(|job_id| {
784                !recovery_context
785                    .fragment_context
786                    .job_map
787                    .contains_key(*job_id)
788            })
789            .copied()
790            .collect_vec();
791        if !missing_background_jobs.is_empty() {
792            warn!(
793                database_id = %database_id,
794                missing_job_ids = ?missing_background_jobs,
795                "background jobs missing in rendered info"
796            );
797        }
798
799        let (state_table_committed_epochs, state_table_log_epochs) = self
800            .hummock_manager
801            .on_current_version_and_table_change_log(|version, table_change_log| {
802                Self::resolve_hummock_version_epochs(
803                    background_jobs.iter().filter_map(|job_id| {
804                        recovery_context
805                            .fragment_context
806                            .job_fragments
807                            .get(job_id)
808                            .map(|job| (*job_id, job))
809                    }),
810                    version,
811                    table_change_log,
812                )
813            })
814            .await?;
815
816        let mv_depended_subscriptions = self
817            .metadata_manager
818            .get_mv_depended_subscriptions(Some(database_id))
819            .await?;
820
821        let cdc_table_snapshot_splits =
822            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
823                .await?;
824
825        self.refresh_manager
826            .remove_trackers_by_database(database_id);
827
828        Ok(DatabaseRuntimeInfoSnapshot {
829            recovery_context,
830            state_table_committed_epochs,
831            state_table_log_epochs,
832            mv_depended_subscriptions,
833            background_jobs,
834            cdc_table_snapshot_splits,
835        })
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use std::collections::HashMap;
842
843    use risingwave_common::catalog::FragmentTypeMask;
844    use risingwave_common::id::WorkerId;
845    use risingwave_meta_model::DispatcherType;
846    use risingwave_meta_model::fragment::DistributionType;
847    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
848    use risingwave_pb::stream_plan::{
849        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
850    };
851
852    use super::*;
853    use crate::controller::fragment::InflightActorInfo;
854    use crate::model::DownstreamFragmentRelation;
855    use crate::stream::UpstreamSinkInfo;
856
857    #[test]
858    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
859        let database_id = DatabaseId::new(1);
860        let job_id = JobId::new(10);
861        let fragment_id = FragmentId::new(100);
862        let sink_fragment_id = FragmentId::new(200);
863
864        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
865        let fragment = InflightFragmentInfo {
866            fragment_id,
867            distribution_type: DistributionType::Hash,
868            fragment_type_mask: FragmentTypeMask::empty(),
869            vnode_count: 1,
870            nodes: PbStreamNode {
871                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
872                    PbUpstreamSinkUnionNode {
873                        init_upstreams: vec![],
874                    },
875                ))),
876                ..Default::default()
877            },
878            actors: HashMap::new(),
879            state_table_ids: HashSet::new(),
880        };
881
882        inflight_jobs
883            .entry(database_id)
884            .or_default()
885            .entry(job_id)
886            .or_default()
887            .insert(fragment_id, fragment);
888
889        let upstream_sink_recovery = HashMap::from([(
890            job_id,
891            UpstreamSinkRecoveryInfo {
892                target_fragment_id: fragment_id,
893                upstream_infos: vec![UpstreamSinkInfo {
894                    sink_id: SinkId::new(1),
895                    sink_fragment_id,
896                    sink_output_fields: vec![],
897                    sink_original_target_columns: vec![],
898                    project_exprs: vec![],
899                    new_sink_downstream: DownstreamFragmentRelation {
900                        downstream_fragment_id: FragmentId::new(300),
901                        dispatcher_type: DispatcherType::Hash,
902                        dist_key_indices: vec![],
903                        output_mapping: PbDispatchOutputMapping::default(),
904                    },
905                }],
906            },
907        )]);
908
909        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
910
911        let updated = inflight_jobs
912            .get(&database_id)
913            .unwrap()
914            .get(&job_id)
915            .unwrap()
916            .get(&fragment_id)
917            .unwrap();
918
919        let PbNodeBody::UpstreamSinkUnion(updated_union) =
920            updated.nodes.node_body.as_ref().unwrap()
921        else {
922            panic!("expected upstream sink union node");
923        };
924
925        assert_eq!(updated_union.init_upstreams.len(), 1);
926        assert_eq!(
927            updated_union.init_upstreams[0].upstream_fragment_id,
928            sink_fragment_id.as_raw_id()
929        );
930    }
931
932    #[test]
933    fn test_build_stream_actors_uses_preloaded_extra_info() {
934        let database_id = DatabaseId::new(2);
935        let job_id = JobId::new(20);
936        let fragment_id = FragmentId::new(120);
937        let actor_id = ActorId::new(500);
938
939        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
940        inflight_jobs
941            .entry(database_id)
942            .or_default()
943            .entry(job_id)
944            .or_default()
945            .insert(
946                fragment_id,
947                InflightFragmentInfo {
948                    fragment_id,
949                    distribution_type: DistributionType::Hash,
950                    fragment_type_mask: FragmentTypeMask::empty(),
951                    vnode_count: 1,
952                    nodes: PbStreamNode::default(),
953                    actors: HashMap::from([(
954                        actor_id,
955                        InflightActorInfo {
956                            worker_id: WorkerId::new(1),
957                            vnode_bitmap: None,
958                            splits: vec![],
959                        },
960                    )]),
961                    state_table_ids: HashSet::new(),
962                },
963            );
964
965        let job_extra_info = HashMap::from([(
966            job_id,
967            StreamingJobExtraInfo {
968                timezone: Some("UTC".to_owned()),
969                config_override: "cfg".into(),
970                adaptive_parallelism_strategy: None,
971                job_definition: "definition".to_owned(),
972                backfill_orders: None,
973            },
974        )]);
975
976        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
977
978        let actor = stream_actors.get(&actor_id).unwrap();
979        assert_eq!(actor.actor_id, actor_id);
980        assert_eq!(actor.fragment_id, fragment_id);
981        assert_eq!(actor.mview_definition, "definition");
982        assert_eq!(&*actor.config_override, "cfg");
983        let expr_ctx = actor.expr_context.as_ref().unwrap();
984        assert_eq!(expr_ctx.time_zone, "UTC");
985    }
986}