risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41    FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::controller::utils::StreamingJobExtraInfo;
44use crate::manager::ActiveStreamingWorkerNodes;
45use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
46use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
47use crate::stream::cdc::reload_cdc_table_snapshot_splits;
48use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
49
50struct UpstreamSinkRecoveryInfo {
51    target_fragment_id: FragmentId,
52    upstream_infos: Vec<UpstreamSinkInfo>,
53}
54
55struct LoadedRecoveryContext {
56    fragment_context: LoadedFragmentContext,
57    job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
58    upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
59    fragment_relations: FragmentDownstreamRelation,
60}
61
62impl LoadedRecoveryContext {
63    fn empty(fragment_context: LoadedFragmentContext) -> Self {
64        Self {
65            fragment_context,
66            job_extra_info: HashMap::new(),
67            upstream_sink_recovery: HashMap::new(),
68            fragment_relations: FragmentDownstreamRelation::default(),
69        }
70    }
71
72    fn backfill_orders(&self) -> HashMap<JobId, HashMap<FragmentId, Vec<FragmentId>>> {
73        self.job_extra_info
74            .iter()
75            .map(|(job_id, extra_info)| {
76                (
77                    *job_id,
78                    extra_info.backfill_orders.clone().unwrap_or_default().0,
79                )
80            })
81            .collect()
82    }
83}
84
85/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
86/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
87/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
88/// the operator. All necessary metadata must be preloaded before rendering.
89fn recovery_table_with_upstream_sinks(
90    inflight_jobs: &mut FragmentRenderMap,
91    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
92) -> MetaResult<()> {
93    if upstream_sink_recovery.is_empty() {
94        return Ok(());
95    }
96
97    let mut seen_jobs = HashSet::new();
98
99    for jobs in inflight_jobs.values_mut() {
100        for (job_id, fragments) in jobs {
101            if !seen_jobs.insert(*job_id) {
102                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
103            }
104
105            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
106                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
107                    refill_upstream_sink_union_in_table(
108                        &mut target_fragment.nodes,
109                        &recovery.upstream_infos,
110                    );
111                } else {
112                    return Err(anyhow::anyhow!(
113                        "target fragment {} not found for upstream sink recovery of job {}",
114                        recovery.target_fragment_id,
115                        job_id
116                    )
117                    .into());
118                }
119            }
120        }
121    }
122
123    Ok(())
124}
125
126/// Assembles `StreamActor` instances from rendered fragment info and job context.
127///
128/// This function combines the actor assignments from `FragmentRenderMap` with
129/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
130/// to produce the final `StreamActor` structures needed for recovery.
131fn build_stream_actors(
132    all_info: &FragmentRenderMap,
133    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
134) -> MetaResult<HashMap<ActorId, StreamActor>> {
135    let mut stream_actors = HashMap::new();
136
137    for (job_id, streaming_info) in all_info.values().flatten() {
138        let extra_info = job_extra_info
139            .get(job_id)
140            .cloned()
141            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
142        let expr_context = extra_info.stream_context().to_expr_context();
143        let job_definition = extra_info.job_definition;
144        let config_override = extra_info.config_override;
145
146        for (fragment_id, fragment_infos) in streaming_info {
147            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
148                stream_actors.insert(
149                    *actor_id,
150                    StreamActor {
151                        actor_id: *actor_id,
152                        fragment_id: *fragment_id,
153                        vnode_bitmap: vnode_bitmap.clone(),
154                        mview_definition: job_definition.clone(),
155                        expr_context: Some(expr_context.clone()),
156                        config_override: config_override.clone(),
157                    },
158                );
159            }
160        }
161    }
162    Ok(stream_actors)
163}
164
165impl GlobalBarrierWorkerContextImpl {
166    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
167    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
168        self.metadata_manager
169            .catalog_controller
170            .clean_dirty_subscription(database_id)
171            .await?;
172        let dirty_associated_source_ids = self
173            .metadata_manager
174            .catalog_controller
175            .clean_dirty_creating_jobs(database_id)
176            .await?;
177        self.metadata_manager
178            .reset_all_refresh_jobs_to_idle()
179            .await?;
180
181        // unregister cleaned sources.
182        self.source_manager
183            .apply_source_change(SourceChange::DropSource {
184                dropped_source_ids: dirty_associated_source_ids,
185            })
186            .await;
187
188        Ok(())
189    }
190
191    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
192        if let Some(database_id) = database_id {
193            let sink_ids = self
194                .metadata_manager
195                .catalog_controller
196                .list_sink_ids(Some(database_id))
197                .await?;
198            self.sink_manager.stop_sink_coordinator(sink_ids).await;
199        } else {
200            self.sink_manager.reset().await;
201        }
202        Ok(())
203    }
204
205    async fn abort_dirty_pending_sink_state(
206        &self,
207        database_id: Option<DatabaseId>,
208    ) -> MetaResult<()> {
209        let pending_sinks: HashSet<SinkId> = self
210            .metadata_manager
211            .catalog_controller
212            .list_all_pending_sinks(database_id)
213            .await?;
214
215        if pending_sinks.is_empty() {
216            return Ok(());
217        }
218
219        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
220            .metadata_manager
221            .catalog_controller
222            .fetch_sink_with_state_table_ids(pending_sinks)
223            .await?;
224
225        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
226
227        for (sink_id, table_ids) in sink_with_state_tables {
228            let Some(table_id) = table_ids.first() else {
229                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
230            };
231
232            self.hummock_manager
233                .on_current_version(|version| -> MetaResult<()> {
234                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
235                        assert!(
236                            sink_committed_epoch
237                                .insert(sink_id, committed_epoch)
238                                .is_none()
239                        );
240                        Ok(())
241                    } else {
242                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
243                    }
244                })
245                .await?;
246        }
247
248        self.metadata_manager
249            .catalog_controller
250            .abort_pending_sink_epochs(sink_committed_epoch)
251            .await?;
252
253        Ok(())
254    }
255
256    async fn purge_state_table_from_hummock(
257        &self,
258        all_state_table_ids: &HashSet<TableId>,
259    ) -> MetaResult<()> {
260        self.hummock_manager.purge(all_state_table_ids).await?;
261        Ok(())
262    }
263
264    async fn list_background_job_progress(
265        &self,
266        database_id: Option<DatabaseId>,
267    ) -> MetaResult<HashSet<JobId>> {
268        let mgr = &self.metadata_manager;
269        mgr.catalog_controller
270            .list_background_creating_jobs(false, database_id)
271            .await
272    }
273
274    /// Sync render stage: uses loaded context and current workers to produce actor assignments.
275    fn render_actor_assignments(
276        &self,
277        database_id: Option<DatabaseId>,
278        loaded: &LoadedFragmentContext,
279        worker_nodes: &ActiveStreamingWorkerNodes,
280        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
281    ) -> MetaResult<FragmentRenderMap> {
282        if loaded.is_empty() {
283            return Ok(HashMap::new());
284        }
285
286        let available_workers: BTreeMap<_, _> = worker_nodes
287            .current()
288            .values()
289            .filter(|worker| worker.is_streaming_schedulable())
290            .map(|worker| {
291                (
292                    worker.id,
293                    WorkerInfo {
294                        parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
295                        resource_group: worker.resource_group(),
296                    },
297                )
298            })
299            .collect();
300
301        let RenderedGraph { fragments, .. } = render_actor_assignments(
302            self.metadata_manager
303                .catalog_controller
304                .env
305                .actor_id_generator(),
306            &available_workers,
307            adaptive_parallelism_strategy,
308            loaded,
309        )?;
310
311        if let Some(database_id) = database_id {
312            for loaded_database_id in fragments.keys() {
313                assert_eq!(*loaded_database_id, database_id);
314            }
315        }
316
317        Ok(fragments)
318    }
319
320    async fn load_recovery_context(
321        &self,
322        database_id: Option<DatabaseId>,
323    ) -> MetaResult<LoadedRecoveryContext> {
324        let inner = self
325            .metadata_manager
326            .catalog_controller
327            .get_inner_read_guard()
328            .await;
329        let txn = inner.db.begin().await?;
330
331        let fragment_context = self
332            .metadata_manager
333            .catalog_controller
334            .load_fragment_context_in_txn(&txn, database_id)
335            .await
336            .inspect_err(|err| {
337                warn!(error = %err.as_report(), "load fragment context failed");
338            })?;
339
340        if fragment_context.is_empty() {
341            return Ok(LoadedRecoveryContext::empty(fragment_context));
342        }
343
344        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
345        let job_extra_info = self
346            .metadata_manager
347            .catalog_controller
348            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
349            .await?;
350
351        let mut upstream_targets = HashMap::new();
352        for fragment in fragment_context.fragment_map.values() {
353            let mut has_upstream_union = false;
354            visit_stream_node_cont(&fragment.stream_node.to_protobuf(), |node| {
355                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
356                    has_upstream_union = true;
357                    false
358                } else {
359                    true
360                }
361            });
362
363            if has_upstream_union
364                && let Some(previous) =
365                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
366            {
367                bail!(
368                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
369                    fragment.job_id,
370                    fragment.fragment_id,
371                    previous
372                );
373            }
374        }
375
376        let mut upstream_sink_recovery = HashMap::new();
377        if !upstream_targets.is_empty() {
378            let tables = self
379                .metadata_manager
380                .catalog_controller
381                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
382                .await?;
383
384            for table in tables {
385                let job_id = table.id.as_job_id();
386                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
387                    // This should not happen unless catalog changes or legacy metadata are involved.
388                    tracing::debug!(
389                        job_id = %job_id,
390                        "upstream sink union target fragment not found for table"
391                    );
392                    continue;
393                };
394
395                let upstream_infos = self
396                    .metadata_manager
397                    .catalog_controller
398                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
399                    .await?;
400
401                upstream_sink_recovery.insert(
402                    job_id,
403                    UpstreamSinkRecoveryInfo {
404                        target_fragment_id: *target_fragment_id,
405                        upstream_infos,
406                    },
407                );
408            }
409        }
410
411        let fragment_relations = self
412            .metadata_manager
413            .catalog_controller
414            .get_fragment_downstream_relations_in_txn(
415                &txn,
416                fragment_context.fragment_map.keys().copied().collect_vec(),
417            )
418            .await?;
419
420        Ok(LoadedRecoveryContext {
421            fragment_context,
422            job_extra_info,
423            upstream_sink_recovery,
424            fragment_relations,
425        })
426    }
427
428    #[expect(clippy::type_complexity)]
429    fn resolve_hummock_version_epochs(
430        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
431        version: &HummockVersion,
432    ) -> MetaResult<(
433        HashMap<TableId, u64>,
434        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
435    )> {
436        let table_committed_epoch: HashMap<_, _> = version
437            .state_table_info
438            .info()
439            .iter()
440            .map(|(table_id, info)| (*table_id, info.committed_epoch))
441            .collect();
442        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
443            Ok(*table_committed_epoch
444                .get(&table_id)
445                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
446        };
447        let mut min_downstream_committed_epochs = HashMap::new();
448        for (job_id, fragments) in background_jobs {
449            let job_committed_epoch = {
450                let mut table_id_iter =
451                    InflightFragmentInfo::existing_table_ids(fragments.values());
452                let Some(first_table_id) = table_id_iter.next() else {
453                    bail!("job {} has no state table", job_id);
454                };
455                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
456                for table_id in table_id_iter {
457                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
458                    if job_committed_epoch != table_committed_epoch {
459                        bail!(
460                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
461                            first_table_id,
462                            job_committed_epoch,
463                            table_id,
464                            table_committed_epoch,
465                            job_id
466                        );
467                    }
468                }
469
470                job_committed_epoch
471            };
472            if let (Some(snapshot_backfill_info), _) =
473                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
474                    fragments
475                        .values()
476                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
477                )?
478            {
479                for (upstream_table, snapshot_epoch) in
480                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
481                {
482                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
483                        anyhow!(
484                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
485                            job_id, upstream_table
486                        )
487                    })?;
488                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
489                    match min_downstream_committed_epochs.entry(upstream_table) {
490                        Entry::Occupied(entry) => {
491                            let prev_min_epoch = entry.into_mut();
492                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
493                        }
494                        Entry::Vacant(entry) => {
495                            entry.insert(pinned_epoch);
496                        }
497                    }
498                }
499            }
500        }
501        let mut log_epochs = HashMap::new();
502        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
503            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
504            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
505                Ordering::Less => {
506                    bail!(
507                        "downstream epoch {} later than upstream epoch {} of table {}",
508                        downstream_committed_epoch,
509                        upstream_committed_epoch,
510                        upstream_table_id
511                    );
512                }
513                Ordering::Equal => {
514                    continue;
515                }
516                Ordering::Greater => {
517                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
518                    {
519                        let epochs = table_change_log
520                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
521                            .map(|epoch_log| {
522                                (
523                                    epoch_log.non_checkpoint_epochs.clone(),
524                                    epoch_log.checkpoint_epoch,
525                                )
526                            })
527                            .collect_vec();
528                        let first_epochs = epochs.first();
529                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
530                            && *first_checkpoint_epoch == downstream_committed_epoch
531                        {
532                        } else {
533                            bail!(
534                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
535                                epochs,
536                                upstream_table_id,
537                                downstream_committed_epoch
538                            );
539                        }
540                        log_epochs
541                            .try_insert(upstream_table_id, epochs)
542                            .expect("non-duplicated");
543                    } else {
544                        bail!(
545                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
546                            upstream_table_id,
547                            upstream_committed_epoch,
548                            downstream_committed_epoch
549                        );
550                    }
551                }
552            }
553        }
554        Ok((table_committed_epoch, log_epochs))
555    }
556
557    pub(super) async fn reload_runtime_info_impl(
558        &self,
559    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
560        {
561            {
562                {
563                    self.clean_dirty_streaming_jobs(None)
564                        .await
565                        .context("clean dirty streaming jobs")?;
566
567                    self.reset_sink_coordinator(None)
568                        .await
569                        .context("reset sink coordinator")?;
570                    self.abort_dirty_pending_sink_state(None)
571                        .await
572                        .context("abort dirty pending sink state")?;
573
574                    // Background job progress needs to be recovered.
575                    tracing::info!("recovering background job progress");
576                    let initial_background_jobs = self
577                        .list_background_job_progress(None)
578                        .await
579                        .context("recover background job progress should not fail")?;
580
581                    tracing::info!("recovered background job progress");
582
583                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
584                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
585                    self.metadata_manager
586                        .catalog_controller
587                        .cleanup_dropped_tables()
588                        .await;
589
590                    let adaptive_parallelism_strategy = {
591                        let system_params_reader = self
592                            .metadata_manager
593                            .catalog_controller
594                            .env
595                            .system_params_reader()
596                            .await;
597                        system_params_reader.adaptive_parallelism_strategy()
598                    };
599
600                    let active_streaming_nodes =
601                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
602                            .await?;
603
604                    let background_streaming_jobs =
605                        initial_background_jobs.iter().cloned().collect_vec();
606
607                    tracing::info!(
608                        "background streaming jobs: {:?} total {}",
609                        background_streaming_jobs,
610                        background_streaming_jobs.len()
611                    );
612
613                    let unreschedulable_jobs = {
614                        let mut unreschedulable_jobs = HashSet::new();
615
616                        for job_id in background_streaming_jobs {
617                            let scan_types = self
618                                .metadata_manager
619                                .get_job_backfill_scan_types(job_id)
620                                .await?;
621
622                            if scan_types
623                                .values()
624                                .any(|scan_type| !scan_type.is_reschedulable())
625                            {
626                                unreschedulable_jobs.insert(job_id);
627                            }
628                        }
629
630                        unreschedulable_jobs
631                    };
632
633                    if !unreschedulable_jobs.is_empty() {
634                        info!(
635                            "unreschedulable background jobs: {:?}",
636                            unreschedulable_jobs
637                        );
638                    }
639
640                    // Resolve actor info for recovery. If there's no actor to recover, most of the
641                    // following steps will be no-op, while the compute nodes will still be reset.
642                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
643                    if !unreschedulable_jobs.is_empty() {
644                        bail!(
645                            "Recovery for unreschedulable background jobs is not yet implemented. \
646                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
647                            unreschedulable_jobs
648                        );
649                    }
650
651                    info!("trigger offline re-rendering");
652                    let mut recovery_context = self.load_recovery_context(None).await?;
653
654                    let mut info = self
655                        .render_actor_assignments(
656                            None,
657                            &recovery_context.fragment_context,
658                            &active_streaming_nodes,
659                            adaptive_parallelism_strategy,
660                        )
661                        .inspect_err(|err| {
662                            warn!(error = %err.as_report(), "render actor assignments failed");
663                        })?;
664
665                    info!("offline re-rendering completed");
666
667                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
668                    if !dropped_table_ids.is_empty() {
669                        self.metadata_manager
670                            .catalog_controller
671                            .complete_dropped_tables(dropped_table_ids)
672                            .await;
673                        recovery_context = self.load_recovery_context(None).await?;
674                        info = self
675                            .render_actor_assignments(
676                                None,
677                                &recovery_context.fragment_context,
678                                &active_streaming_nodes,
679                                adaptive_parallelism_strategy,
680                            )
681                            .inspect_err(|err| {
682                                warn!(error = %err.as_report(), "render actor assignments failed");
683                            })?
684                    }
685
686                    recovery_table_with_upstream_sinks(
687                        &mut info,
688                        &recovery_context.upstream_sink_recovery,
689                    )?;
690
691                    let info = info;
692
693                    self.purge_state_table_from_hummock(
694                        &info
695                            .values()
696                            .flatten()
697                            .flat_map(|(_, fragments)| {
698                                InflightFragmentInfo::existing_table_ids(fragments.values())
699                            })
700                            .collect(),
701                    )
702                    .await
703                    .context("purge state table from hummock")?;
704
705                    let (state_table_committed_epochs, state_table_log_epochs) = self
706                        .hummock_manager
707                        .on_current_version(|version| {
708                            Self::resolve_hummock_version_epochs(
709                                info.values().flat_map(|jobs| {
710                                    jobs.iter().filter_map(|(job_id, job)| {
711                                        initial_background_jobs
712                                            .contains(job_id)
713                                            .then_some((*job_id, job))
714                                    })
715                                }),
716                                version,
717                            )
718                        })
719                        .await?;
720
721                    let mv_depended_subscriptions = self
722                        .metadata_manager
723                        .get_mv_depended_subscriptions(None)
724                        .await?;
725
726                    let stream_actors =
727                        build_stream_actors(&info, &recovery_context.job_extra_info)?;
728
729                    let backfill_orders = recovery_context.backfill_orders();
730                    let fragment_relations = recovery_context.fragment_relations;
731
732                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
733                    let background_jobs = {
734                        let mut refreshed_background_jobs = self
735                            .list_background_job_progress(None)
736                            .await
737                            .context("recover background job progress should not fail")?;
738                        info.values()
739                            .flatten()
740                            .filter_map(|(job_id, _)| {
741                                refreshed_background_jobs.remove(job_id).then_some(*job_id)
742                            })
743                            .collect()
744                    };
745
746                    let database_infos = self
747                        .metadata_manager
748                        .catalog_controller
749                        .list_databases()
750                        .await?;
751
752                    // get split assignments for all actors
753                    let mut source_splits = HashMap::new();
754                    for (_, fragment_infos) in info.values().flatten() {
755                        for fragment in fragment_infos.values() {
756                            for (actor_id, info) in &fragment.actors {
757                                source_splits.insert(*actor_id, info.splits.clone());
758                            }
759                        }
760                    }
761
762                    let cdc_table_snapshot_splits =
763                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
764                            .await?;
765
766                    Ok(BarrierWorkerRuntimeInfoSnapshot {
767                        active_streaming_nodes,
768                        database_job_infos: info,
769                        backfill_orders,
770                        state_table_committed_epochs,
771                        state_table_log_epochs,
772                        mv_depended_subscriptions,
773                        stream_actors,
774                        fragment_relations,
775                        source_splits,
776                        background_jobs,
777                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
778                        database_infos,
779                        cdc_table_snapshot_splits,
780                    })
781                }
782            }
783        }
784    }
785
786    pub(super) async fn reload_database_runtime_info_impl(
787        &self,
788        database_id: DatabaseId,
789    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
790        self.clean_dirty_streaming_jobs(Some(database_id))
791            .await
792            .context("clean dirty streaming jobs")?;
793
794        self.reset_sink_coordinator(Some(database_id))
795            .await
796            .context("reset sink coordinator")?;
797        self.abort_dirty_pending_sink_state(Some(database_id))
798            .await
799            .context("abort dirty pending sink state")?;
800
801        // Background job progress needs to be recovered.
802        tracing::info!(
803            ?database_id,
804            "recovering background job progress of database"
805        );
806
807        let background_jobs = self
808            .list_background_job_progress(Some(database_id))
809            .await
810            .context("recover background job progress of database should not fail")?;
811        tracing::info!(?database_id, "recovered background job progress");
812
813        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
814        let dropped_table_ids = self
815            .scheduled_barriers
816            .pre_apply_drop_cancel(Some(database_id));
817        self.metadata_manager
818            .catalog_controller
819            .complete_dropped_tables(dropped_table_ids)
820            .await;
821
822        let adaptive_parallelism_strategy = {
823            let system_params_reader = self
824                .metadata_manager
825                .catalog_controller
826                .env
827                .system_params_reader()
828                .await;
829            system_params_reader.adaptive_parallelism_strategy()
830        };
831
832        let active_streaming_nodes =
833            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
834
835        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
836
837        let mut all_info = self
838            .render_actor_assignments(
839                Some(database_id),
840                &recovery_context.fragment_context,
841                &active_streaming_nodes,
842                adaptive_parallelism_strategy,
843            )
844            .inspect_err(|err| {
845                warn!(error = %err.as_report(), "render actor assignments failed");
846            })?;
847
848        let mut database_info = all_info
849            .remove(&database_id)
850            .map_or_else(HashMap::new, |table_map| {
851                HashMap::from([(database_id, table_map)])
852            });
853
854        recovery_table_with_upstream_sinks(
855            &mut database_info,
856            &recovery_context.upstream_sink_recovery,
857        )?;
858
859        assert!(database_info.len() <= 1);
860
861        let stream_actors = build_stream_actors(&database_info, &recovery_context.job_extra_info)?;
862
863        let Some(info) = database_info
864            .into_iter()
865            .next()
866            .map(|(loaded_database_id, info)| {
867                assert_eq!(loaded_database_id, database_id);
868                info
869            })
870        else {
871            return Ok(None);
872        };
873
874        let missing_background_jobs = background_jobs
875            .iter()
876            .filter(|job_id| !info.contains_key(job_id))
877            .copied()
878            .collect_vec();
879        if !missing_background_jobs.is_empty() {
880            warn!(
881                database_id = %database_id,
882                missing_job_ids = ?missing_background_jobs,
883                "background jobs missing in rendered info"
884            );
885        }
886
887        let (state_table_committed_epochs, state_table_log_epochs) = self
888            .hummock_manager
889            .on_current_version(|version| {
890                Self::resolve_hummock_version_epochs(
891                    background_jobs
892                        .iter()
893                        .filter_map(|job_id| info.get(job_id).map(|job| (*job_id, job))),
894                    version,
895                )
896            })
897            .await?;
898
899        let mv_depended_subscriptions = self
900            .metadata_manager
901            .get_mv_depended_subscriptions(Some(database_id))
902            .await?;
903
904        let backfill_orders = recovery_context.backfill_orders();
905        let fragment_relations = recovery_context.fragment_relations;
906
907        // get split assignments for all actors
908        let mut source_splits = HashMap::new();
909        for (_, fragment) in info.values().flatten() {
910            for (actor_id, info) in &fragment.actors {
911                source_splits.insert(*actor_id, info.splits.clone());
912            }
913        }
914
915        let cdc_table_snapshot_splits =
916            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
917                .await?;
918
919        self.refresh_manager
920            .remove_trackers_by_database(database_id);
921
922        Ok(Some(DatabaseRuntimeInfoSnapshot {
923            job_infos: info,
924            backfill_orders,
925            state_table_committed_epochs,
926            state_table_log_epochs,
927            mv_depended_subscriptions,
928            stream_actors,
929            fragment_relations,
930            source_splits,
931            background_jobs,
932            cdc_table_snapshot_splits,
933        }))
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use std::collections::HashMap;
940
941    use risingwave_common::catalog::FragmentTypeMask;
942    use risingwave_common::id::WorkerId;
943    use risingwave_meta_model::DispatcherType;
944    use risingwave_meta_model::fragment::DistributionType;
945    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
946    use risingwave_pb::stream_plan::{
947        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
948    };
949
950    use super::*;
951    use crate::controller::fragment::InflightActorInfo;
952    use crate::model::DownstreamFragmentRelation;
953    use crate::stream::UpstreamSinkInfo;
954
955    #[test]
956    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
957        let database_id = DatabaseId::new(1);
958        let job_id = JobId::new(10);
959        let fragment_id = FragmentId::new(100);
960        let sink_fragment_id = FragmentId::new(200);
961
962        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
963        let fragment = InflightFragmentInfo {
964            fragment_id,
965            distribution_type: DistributionType::Hash,
966            fragment_type_mask: FragmentTypeMask::empty(),
967            vnode_count: 1,
968            nodes: PbStreamNode {
969                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
970                    PbUpstreamSinkUnionNode {
971                        init_upstreams: vec![],
972                    },
973                ))),
974                ..Default::default()
975            },
976            actors: HashMap::new(),
977            state_table_ids: HashSet::new(),
978        };
979
980        inflight_jobs
981            .entry(database_id)
982            .or_default()
983            .entry(job_id)
984            .or_default()
985            .insert(fragment_id, fragment);
986
987        let upstream_sink_recovery = HashMap::from([(
988            job_id,
989            UpstreamSinkRecoveryInfo {
990                target_fragment_id: fragment_id,
991                upstream_infos: vec![UpstreamSinkInfo {
992                    sink_id: SinkId::new(1),
993                    sink_fragment_id,
994                    sink_output_fields: vec![],
995                    sink_original_target_columns: vec![],
996                    project_exprs: vec![],
997                    new_sink_downstream: DownstreamFragmentRelation {
998                        downstream_fragment_id: FragmentId::new(300),
999                        dispatcher_type: DispatcherType::Hash,
1000                        dist_key_indices: vec![],
1001                        output_mapping: PbDispatchOutputMapping::default(),
1002                    },
1003                }],
1004            },
1005        )]);
1006
1007        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1008
1009        let updated = inflight_jobs
1010            .get(&database_id)
1011            .unwrap()
1012            .get(&job_id)
1013            .unwrap()
1014            .get(&fragment_id)
1015            .unwrap();
1016
1017        let PbNodeBody::UpstreamSinkUnion(updated_union) =
1018            updated.nodes.node_body.as_ref().unwrap()
1019        else {
1020            panic!("expected upstream sink union node");
1021        };
1022
1023        assert_eq!(updated_union.init_upstreams.len(), 1);
1024        assert_eq!(
1025            updated_union.init_upstreams[0].upstream_fragment_id,
1026            sink_fragment_id.as_raw_id()
1027        );
1028    }
1029
1030    #[test]
1031    fn test_build_stream_actors_uses_preloaded_extra_info() {
1032        let database_id = DatabaseId::new(2);
1033        let job_id = JobId::new(20);
1034        let fragment_id = FragmentId::new(120);
1035        let actor_id = ActorId::new(500);
1036
1037        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1038        inflight_jobs
1039            .entry(database_id)
1040            .or_default()
1041            .entry(job_id)
1042            .or_default()
1043            .insert(
1044                fragment_id,
1045                InflightFragmentInfo {
1046                    fragment_id,
1047                    distribution_type: DistributionType::Hash,
1048                    fragment_type_mask: FragmentTypeMask::empty(),
1049                    vnode_count: 1,
1050                    nodes: PbStreamNode::default(),
1051                    actors: HashMap::from([(
1052                        actor_id,
1053                        InflightActorInfo {
1054                            worker_id: WorkerId::new(1),
1055                            vnode_bitmap: None,
1056                            splits: vec![],
1057                        },
1058                    )]),
1059                    state_table_ids: HashSet::new(),
1060                },
1061            );
1062
1063        let job_extra_info = HashMap::from([(
1064            job_id,
1065            StreamingJobExtraInfo {
1066                timezone: Some("UTC".to_owned()),
1067                config_override: "cfg".into(),
1068                job_definition: "definition".to_owned(),
1069                backfill_orders: None,
1070            },
1071        )]);
1072
1073        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1074
1075        let actor = stream_actors.get(&actor_id).unwrap();
1076        assert_eq!(actor.actor_id, actor_id);
1077        assert_eq!(actor.fragment_id, fragment_id);
1078        assert_eq!(actor.mview_definition, "definition");
1079        assert_eq!(&*actor.config_override, "cfg");
1080        let expr_ctx = actor.expr_context.as_ref().unwrap();
1081        assert_eq!(expr_ctx.time_zone, "UTC");
1082    }
1083}