risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::TableChangeLogs;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41    FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
42    render_actor_assignments,
43};
44use crate::controller::utils::StreamingJobExtraInfo;
45use crate::manager::ActiveStreamingWorkerNodes;
46use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
47use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
48use crate::stream::cdc::reload_cdc_table_snapshot_splits;
49use crate::stream::{
50    SourceChange, StreamFragmentGraph, UpstreamSinkInfo, cleanup_dropped_streaming_jobs,
51};
52
53#[derive(Debug)]
54pub(crate) struct UpstreamSinkRecoveryInfo {
55    target_fragment_id: FragmentId,
56    upstream_infos: Vec<UpstreamSinkInfo>,
57}
58
59#[derive(Debug)]
60pub struct LoadedRecoveryContext {
61    pub fragment_context: LoadedFragmentContext,
62    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
63    pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
64    pub fragment_relations: FragmentDownstreamRelation,
65}
66
67impl LoadedRecoveryContext {
68    fn empty(fragment_context: LoadedFragmentContext) -> Self {
69        Self {
70            fragment_context,
71            job_extra_info: HashMap::new(),
72            upstream_sink_recovery: HashMap::new(),
73            fragment_relations: FragmentDownstreamRelation::default(),
74        }
75    }
76}
77
78pub struct RenderedDatabaseRuntimeInfo {
79    pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
80    pub stream_actors: HashMap<ActorId, StreamActor>,
81    pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
82}
83
84pub fn render_runtime_info(
85    actor_id_generator: &AtomicU32,
86    worker_nodes: &ActiveStreamingWorkerNodes,
87    recovery_context: &LoadedRecoveryContext,
88    database_id: DatabaseId,
89) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
90    let Some(per_database_context) = recovery_context.fragment_context.for_database(database_id)
91    else {
92        return Ok(None);
93    };
94
95    assert!(!per_database_context.is_empty());
96
97    let RenderedGraph { mut fragments, .. } = render_actor_assignments(
98        actor_id_generator,
99        worker_nodes.current(),
100        &per_database_context,
101    )?;
102
103    let single_database = match fragments.remove(&database_id) {
104        Some(info) => info,
105        None => return Ok(None),
106    };
107
108    let mut database_map = HashMap::from([(database_id, single_database)]);
109    recovery_table_with_upstream_sinks(
110        &mut database_map,
111        &recovery_context.upstream_sink_recovery,
112    )?;
113    let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
114
115    let job_infos = database_map
116        .remove(&database_id)
117        .expect("database entry must exist");
118
119    let mut source_splits = HashMap::new();
120    for fragment_infos in job_infos.values() {
121        for fragment in fragment_infos.values() {
122            for (actor_id, info) in &fragment.actors {
123                source_splits.insert(*actor_id, info.splits.clone());
124            }
125        }
126    }
127
128    Ok(Some(RenderedDatabaseRuntimeInfo {
129        job_infos,
130        stream_actors,
131        source_splits,
132    }))
133}
134
135/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
136/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
137/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
138/// the operator. All necessary metadata must be preloaded before rendering.
139fn recovery_table_with_upstream_sinks(
140    inflight_jobs: &mut FragmentRenderMap,
141    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
142) -> MetaResult<()> {
143    if upstream_sink_recovery.is_empty() {
144        return Ok(());
145    }
146
147    let mut seen_jobs = HashSet::new();
148
149    for jobs in inflight_jobs.values_mut() {
150        for (job_id, fragments) in jobs {
151            if !seen_jobs.insert(*job_id) {
152                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
153            }
154
155            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
156                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
157                    refill_upstream_sink_union_in_table(
158                        &mut target_fragment.nodes,
159                        &recovery.upstream_infos,
160                    );
161                } else {
162                    return Err(anyhow::anyhow!(
163                        "target fragment {} not found for upstream sink recovery of job {}",
164                        recovery.target_fragment_id,
165                        job_id
166                    )
167                    .into());
168                }
169            }
170        }
171    }
172
173    Ok(())
174}
175
176/// Assembles `StreamActor` instances from rendered fragment info and job context.
177///
178/// This function combines the actor assignments from `FragmentRenderMap` with
179/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
180/// to produce the final `StreamActor` structures needed for recovery.
181fn build_stream_actors(
182    all_info: &FragmentRenderMap,
183    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
184) -> MetaResult<HashMap<ActorId, StreamActor>> {
185    let mut stream_actors = HashMap::new();
186
187    for (job_id, streaming_info) in all_info.values().flatten() {
188        let extra_info = job_extra_info
189            .get(job_id)
190            .cloned()
191            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
192        let expr_context = extra_info.stream_context().to_expr_context();
193        let job_definition = extra_info.job_definition;
194        let config_override = extra_info.config_override;
195
196        for (fragment_id, fragment_infos) in streaming_info {
197            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
198                stream_actors.insert(
199                    *actor_id,
200                    StreamActor {
201                        actor_id: *actor_id,
202                        fragment_id: *fragment_id,
203                        vnode_bitmap: vnode_bitmap.clone(),
204                        mview_definition: job_definition.clone(),
205                        expr_context: Some(expr_context.clone()),
206                        config_override: config_override.clone(),
207                    },
208                );
209            }
210        }
211    }
212    Ok(stream_actors)
213}
214
215impl GlobalBarrierWorkerContextImpl {
216    async fn apply_pre_applied_drop_cancel(
217        &self,
218        database_id: Option<DatabaseId>,
219    ) -> MetaResult<bool> {
220        let drop_cancel = self.scheduled_barriers.pre_apply_drop_cancel(database_id);
221        let has_drop_streaming_jobs = !drop_cancel.streaming_job_ids.is_empty();
222        cleanup_dropped_streaming_jobs(
223            &self.refresh_manager,
224            &self.hummock_manager,
225            &self.metadata_manager,
226            drop_cancel.streaming_job_ids,
227            drop_cancel.dropped_state_table_ids,
228            "drop_streaming_jobs",
229        )
230        .await?;
231        Ok(has_drop_streaming_jobs)
232    }
233
234    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
235    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
236        self.metadata_manager
237            .catalog_controller
238            .clean_dirty_subscription(database_id)
239            .await?;
240
241        let cleaned_dirty_jobs = self
242            .metadata_manager
243            .catalog_controller
244            .clean_dirty_creating_jobs(database_id)
245            .await?;
246        if database_id.is_some() {
247            // Per-database recovery does not run the global Hummock purge below. Unregister the
248            // dirty jobs cleaned in this database through the normal dropped-table cleanup path.
249            cleanup_dropped_streaming_jobs(
250                &self.refresh_manager,
251                &self.hummock_manager,
252                &self.metadata_manager,
253                cleaned_dirty_jobs.streaming_job_ids,
254                cleaned_dirty_jobs.dropped_table_ids,
255                "clean_dirty_creating_jobs",
256            )
257            .await?;
258        }
259        self.metadata_manager
260            .reset_all_refresh_jobs_to_idle()
261            .await?;
262
263        // unregister cleaned sources.
264        self.source_manager
265            .apply_source_change(SourceChange::DropSource {
266                dropped_source_ids: cleaned_dirty_jobs.source_ids,
267            })
268            .await;
269
270        Ok(())
271    }
272
273    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
274        if let Some(database_id) = database_id {
275            let sink_ids = self
276                .metadata_manager
277                .catalog_controller
278                .list_sink_ids(Some(database_id))
279                .await?;
280            self.sink_manager.stop_sink_coordinator(sink_ids).await;
281        } else {
282            self.sink_manager.reset().await;
283        }
284        Ok(())
285    }
286
287    async fn abort_dirty_pending_sink_state(
288        &self,
289        database_id: Option<DatabaseId>,
290    ) -> MetaResult<()> {
291        let pending_sinks: HashSet<SinkId> = self
292            .metadata_manager
293            .catalog_controller
294            .list_all_pending_sinks(database_id)
295            .await?;
296
297        if pending_sinks.is_empty() {
298            return Ok(());
299        }
300
301        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
302            .metadata_manager
303            .catalog_controller
304            .fetch_sink_with_state_table_ids(pending_sinks)
305            .await?;
306
307        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
308
309        for (sink_id, table_ids) in sink_with_state_tables {
310            let Some(table_id) = table_ids.first() else {
311                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
312            };
313
314            self.hummock_manager
315                .on_current_version(|version| -> MetaResult<()> {
316                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
317                        assert!(
318                            sink_committed_epoch
319                                .insert(sink_id, committed_epoch)
320                                .is_none()
321                        );
322                        Ok(())
323                    } else {
324                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
325                    }
326                })
327                .await?;
328        }
329
330        self.metadata_manager
331            .catalog_controller
332            .abort_pending_sink_epochs(sink_committed_epoch)
333            .await?;
334
335        Ok(())
336    }
337
338    async fn purge_state_table_from_hummock(
339        &self,
340        all_state_table_ids: &HashSet<TableId>,
341    ) -> MetaResult<()> {
342        self.hummock_manager.purge(all_state_table_ids).await?;
343        Ok(())
344    }
345
346    async fn list_background_job_progress(
347        &self,
348        database_id: Option<DatabaseId>,
349    ) -> MetaResult<HashSet<JobId>> {
350        let mgr = &self.metadata_manager;
351        mgr.catalog_controller
352            .list_background_creating_jobs(false, database_id)
353            .await
354    }
355
356    async fn load_recovery_context(
357        &self,
358        database_id: Option<DatabaseId>,
359    ) -> MetaResult<LoadedRecoveryContext> {
360        let inner = self
361            .metadata_manager
362            .catalog_controller
363            .get_inner_read_guard()
364            .await;
365        let txn = inner.db.begin().await?;
366
367        let fragment_context = self
368            .metadata_manager
369            .catalog_controller
370            .load_fragment_context_in_txn(&txn, database_id)
371            .await
372            .inspect_err(|err| {
373                warn!(error = %err.as_report(), "load fragment context failed");
374            })?;
375
376        if fragment_context.is_empty() {
377            return Ok(LoadedRecoveryContext::empty(fragment_context));
378        }
379
380        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
381        let job_extra_info = self
382            .metadata_manager
383            .catalog_controller
384            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
385            .await?;
386
387        let mut upstream_targets = HashMap::new();
388        for fragment in fragment_context
389            .job_fragments
390            .values()
391            .flat_map(|fragments| fragments.values())
392        {
393            let mut has_upstream_union = false;
394            visit_stream_node_cont(&fragment.nodes, |node| {
395                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
396                    has_upstream_union = true;
397                    false
398                } else {
399                    true
400                }
401            });
402
403            if has_upstream_union
404                && let Some(previous) =
405                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
406            {
407                bail!(
408                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
409                    fragment.job_id,
410                    fragment.fragment_id,
411                    previous
412                );
413            }
414        }
415
416        let mut upstream_sink_recovery = HashMap::new();
417        if !upstream_targets.is_empty() {
418            let tables = self
419                .metadata_manager
420                .catalog_controller
421                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
422                .await?;
423
424            for table in tables {
425                let job_id = table.id.as_job_id();
426                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
427                    // This should not happen unless catalog changes or legacy metadata are involved.
428                    tracing::debug!(
429                        job_id = %job_id,
430                        "upstream sink union target fragment not found for table"
431                    );
432                    continue;
433                };
434
435                let upstream_infos = self
436                    .metadata_manager
437                    .catalog_controller
438                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
439                    .await?;
440
441                upstream_sink_recovery.insert(
442                    job_id,
443                    UpstreamSinkRecoveryInfo {
444                        target_fragment_id: *target_fragment_id,
445                        upstream_infos,
446                    },
447                );
448            }
449        }
450
451        let fragment_relations = self
452            .metadata_manager
453            .catalog_controller
454            .get_fragment_downstream_relations_in_txn(
455                &txn,
456                fragment_context
457                    .job_fragments
458                    .values()
459                    .flat_map(|fragments| fragments.keys().copied())
460                    .collect_vec(),
461            )
462            .await?;
463
464        Ok(LoadedRecoveryContext {
465            fragment_context,
466            job_extra_info,
467            upstream_sink_recovery,
468            fragment_relations,
469        })
470    }
471
472    #[expect(clippy::type_complexity)]
473    fn resolve_hummock_version_epochs(
474        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
475        version: &HummockVersion,
476        table_change_log: &TableChangeLogs,
477    ) -> MetaResult<(
478        HashMap<TableId, u64>,
479        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
480    )> {
481        let table_committed_epoch: HashMap<_, _> = version
482            .state_table_info
483            .info()
484            .iter()
485            .map(|(table_id, info)| (*table_id, info.committed_epoch))
486            .collect();
487        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
488            Ok(*table_committed_epoch
489                .get(&table_id)
490                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
491        };
492        let mut min_downstream_committed_epochs = HashMap::new();
493        for (job_id, fragments) in background_jobs {
494            let job_committed_epoch = {
495                let mut table_id_iter = fragments
496                    .values()
497                    .flat_map(|fragment| fragment.state_table_ids.iter().copied());
498                let Some(first_table_id) = table_id_iter.next() else {
499                    bail!("job {} has no state table", job_id);
500                };
501                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
502                for table_id in table_id_iter {
503                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
504                    if job_committed_epoch != table_committed_epoch {
505                        bail!(
506                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
507                            first_table_id,
508                            job_committed_epoch,
509                            table_id,
510                            table_committed_epoch,
511                            job_id
512                        );
513                    }
514                }
515
516                job_committed_epoch
517            };
518            if let (Some(snapshot_backfill_info), _) =
519                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
520                    fragments
521                        .values()
522                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
523                )?
524            {
525                for (upstream_table, snapshot_epoch) in
526                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
527                {
528                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
529                        anyhow!(
530                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
531                            job_id, upstream_table
532                        )
533                    })?;
534                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
535                    match min_downstream_committed_epochs.entry(upstream_table) {
536                        Entry::Occupied(entry) => {
537                            let prev_min_epoch = entry.into_mut();
538                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
539                        }
540                        Entry::Vacant(entry) => {
541                            entry.insert(pinned_epoch);
542                        }
543                    }
544                }
545            }
546        }
547        let mut log_epochs = HashMap::new();
548        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
549            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
550            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
551                Ordering::Less => {
552                    bail!(
553                        "downstream epoch {} later than upstream epoch {} of table {}",
554                        downstream_committed_epoch,
555                        upstream_committed_epoch,
556                        upstream_table_id
557                    );
558                }
559                Ordering::Equal => {
560                    continue;
561                }
562                Ordering::Greater => {
563                    if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
564                        let epochs = table_change_log
565                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
566                            .map(|epoch_log| {
567                                (
568                                    epoch_log.non_checkpoint_epochs.clone(),
569                                    epoch_log.checkpoint_epoch,
570                                )
571                            })
572                            .collect_vec();
573                        let first_epochs = epochs.first();
574                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
575                            && *first_checkpoint_epoch == downstream_committed_epoch
576                        {
577                        } else {
578                            bail!(
579                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
580                                epochs,
581                                upstream_table_id,
582                                downstream_committed_epoch
583                            );
584                        }
585                        log_epochs
586                            .try_insert(upstream_table_id, epochs)
587                            .expect("non-duplicated");
588                    } else {
589                        bail!(
590                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
591                            upstream_table_id,
592                            upstream_committed_epoch,
593                            downstream_committed_epoch
594                        );
595                    }
596                }
597            }
598        }
599        Ok((table_committed_epoch, log_epochs))
600    }
601
602    pub(super) async fn reload_runtime_info_impl(
603        &self,
604    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
605        {
606            {
607                {
608                    self.clean_dirty_streaming_jobs(None)
609                        .await
610                        .context("clean dirty streaming jobs")?;
611
612                    self.reset_sink_coordinator(None)
613                        .await
614                        .context("reset sink coordinator")?;
615                    self.abort_dirty_pending_sink_state(None)
616                        .await
617                        .context("abort dirty pending sink state")?;
618
619                    // Background job progress needs to be recovered.
620                    tracing::info!("recovering background job progress");
621                    let initial_background_jobs = self
622                        .list_background_job_progress(None)
623                        .await
624                        .context("recover background job progress should not fail")?;
625
626                    tracing::info!("recovered background job progress");
627
628                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
629                    let _ = self.apply_pre_applied_drop_cancel(None).await?;
630                    self.metadata_manager
631                        .catalog_controller
632                        .cleanup_dropped_tables()
633                        .await;
634
635                    let active_streaming_nodes =
636                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
637                            .await?;
638
639                    let background_streaming_jobs =
640                        initial_background_jobs.iter().cloned().collect_vec();
641
642                    tracing::info!(
643                        "background streaming jobs: {:?} total {}",
644                        background_streaming_jobs,
645                        background_streaming_jobs.len()
646                    );
647
648                    let unreschedulable_jobs = {
649                        let mut unreschedulable_jobs = HashSet::new();
650
651                        for job_id in background_streaming_jobs {
652                            let scan_types = self
653                                .metadata_manager
654                                .get_job_backfill_scan_types(job_id)
655                                .await?;
656
657                            if scan_types
658                                .values()
659                                .any(|scan_type| !scan_type.is_reschedulable(false))
660                            {
661                                unreschedulable_jobs.insert(job_id);
662                            }
663                        }
664
665                        unreschedulable_jobs
666                    };
667
668                    if !unreschedulable_jobs.is_empty() {
669                        info!(
670                            "unreschedulable background jobs: {:?}",
671                            unreschedulable_jobs
672                        );
673                    }
674
675                    // Resolve actor info for recovery. If there's no actor to recover, most of the
676                    // following steps will be no-op, while the compute nodes will still be reset.
677                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
678                    if !unreschedulable_jobs.is_empty() {
679                        bail!(
680                            "Recovery for unreschedulable background jobs is not yet implemented. \
681                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
682                            unreschedulable_jobs
683                        );
684                    }
685
686                    let mut recovery_context = self.load_recovery_context(None).await?;
687                    if self.apply_pre_applied_drop_cancel(None).await? {
688                        recovery_context = self.load_recovery_context(None).await?;
689                    }
690
691                    self.purge_state_table_from_hummock(
692                        &recovery_context
693                            .fragment_context
694                            .job_fragments
695                            .values()
696                            .flat_map(|fragments| fragments.values())
697                            .flat_map(|fragment| fragment.state_table_ids.iter().copied())
698                            .collect(),
699                    )
700                    .await
701                    .context("purge state table from hummock")?;
702
703                    let (state_table_committed_epochs, state_table_log_epochs) = self
704                        .hummock_manager
705                        .on_current_version_and_table_change_log(|version, table_change_log| {
706                            Self::resolve_hummock_version_epochs(
707                                recovery_context
708                                    .fragment_context
709                                    .job_fragments
710                                    .iter()
711                                    .filter_map(|(job_id, job)| {
712                                        initial_background_jobs
713                                            .contains(job_id)
714                                            .then_some((*job_id, job))
715                                    }),
716                                version,
717                                table_change_log,
718                            )
719                        })
720                        .await?;
721
722                    let mv_depended_subscriptions = self
723                        .metadata_manager
724                        .get_mv_depended_subscriptions(None)
725                        .await?;
726
727                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
728                    let background_jobs = {
729                        let mut refreshed_background_jobs = self
730                            .list_background_job_progress(None)
731                            .await
732                            .context("recover background job progress should not fail")?;
733                        recovery_context
734                            .fragment_context
735                            .job_map
736                            .keys()
737                            .filter_map(|job_id| {
738                                refreshed_background_jobs.remove(job_id).then_some(*job_id)
739                            })
740                            .collect()
741                    };
742
743                    let database_infos = self
744                        .metadata_manager
745                        .catalog_controller
746                        .list_databases()
747                        .await?;
748
749                    let cdc_table_snapshot_splits =
750                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
751                            .await?;
752
753                    Ok(BarrierWorkerRuntimeInfoSnapshot {
754                        active_streaming_nodes,
755                        recovery_context,
756                        state_table_committed_epochs,
757                        state_table_log_epochs,
758                        mv_depended_subscriptions,
759                        background_jobs,
760                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
761                        database_infos,
762                        cdc_table_snapshot_splits,
763                    })
764                }
765            }
766        }
767    }
768
769    pub(super) async fn reload_database_runtime_info_impl(
770        &self,
771        database_id: DatabaseId,
772    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
773        self.clean_dirty_streaming_jobs(Some(database_id))
774            .await
775            .context("clean dirty streaming jobs")?;
776
777        self.reset_sink_coordinator(Some(database_id))
778            .await
779            .context("reset sink coordinator")?;
780        self.abort_dirty_pending_sink_state(Some(database_id))
781            .await
782            .context("abort dirty pending sink state")?;
783
784        // Background job progress needs to be recovered.
785        tracing::info!(
786            ?database_id,
787            "recovering background job progress of database"
788        );
789
790        let background_jobs = self
791            .list_background_job_progress(Some(database_id))
792            .await
793            .context("recover background job progress of database should not fail")?;
794        tracing::info!(?database_id, "recovered background job progress");
795
796        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
797        let _ = self
798            .apply_pre_applied_drop_cancel(Some(database_id))
799            .await?;
800
801        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
802
803        let missing_background_jobs = background_jobs
804            .iter()
805            .filter(|job_id| {
806                !recovery_context
807                    .fragment_context
808                    .job_map
809                    .contains_key(*job_id)
810            })
811            .copied()
812            .collect_vec();
813        if !missing_background_jobs.is_empty() {
814            warn!(
815                database_id = %database_id,
816                missing_job_ids = ?missing_background_jobs,
817                "background jobs missing in rendered info"
818            );
819        }
820
821        let (state_table_committed_epochs, state_table_log_epochs) = self
822            .hummock_manager
823            .on_current_version_and_table_change_log(|version, table_change_log| {
824                Self::resolve_hummock_version_epochs(
825                    background_jobs.iter().filter_map(|job_id| {
826                        recovery_context
827                            .fragment_context
828                            .job_fragments
829                            .get(job_id)
830                            .map(|job| (*job_id, job))
831                    }),
832                    version,
833                    table_change_log,
834                )
835            })
836            .await?;
837
838        let mv_depended_subscriptions = self
839            .metadata_manager
840            .get_mv_depended_subscriptions(Some(database_id))
841            .await?;
842
843        let cdc_table_snapshot_splits =
844            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
845                .await?;
846
847        self.refresh_manager
848            .remove_trackers_by_database(database_id);
849
850        Ok(DatabaseRuntimeInfoSnapshot {
851            recovery_context,
852            state_table_committed_epochs,
853            state_table_log_epochs,
854            mv_depended_subscriptions,
855            background_jobs,
856            cdc_table_snapshot_splits,
857        })
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use std::collections::HashMap;
864
865    use risingwave_common::catalog::FragmentTypeMask;
866    use risingwave_common::id::WorkerId;
867    use risingwave_meta_model::DispatcherType;
868    use risingwave_meta_model::fragment::DistributionType;
869    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
870    use risingwave_pb::stream_plan::{
871        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
872    };
873
874    use super::*;
875    use crate::controller::fragment::InflightActorInfo;
876    use crate::model::DownstreamFragmentRelation;
877    use crate::stream::UpstreamSinkInfo;
878
879    #[test]
880    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
881        let database_id = DatabaseId::new(1);
882        let job_id = JobId::new(10);
883        let fragment_id = FragmentId::new(100);
884        let sink_fragment_id = FragmentId::new(200);
885
886        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
887        let fragment = InflightFragmentInfo {
888            fragment_id,
889            distribution_type: DistributionType::Hash,
890            fragment_type_mask: FragmentTypeMask::empty(),
891            vnode_count: 1,
892            nodes: PbStreamNode {
893                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
894                    PbUpstreamSinkUnionNode {
895                        init_upstreams: vec![],
896                    },
897                ))),
898                ..Default::default()
899            },
900            actors: HashMap::new(),
901            state_table_ids: HashSet::new(),
902        };
903
904        inflight_jobs
905            .entry(database_id)
906            .or_default()
907            .entry(job_id)
908            .or_default()
909            .insert(fragment_id, fragment);
910
911        let upstream_sink_recovery = HashMap::from([(
912            job_id,
913            UpstreamSinkRecoveryInfo {
914                target_fragment_id: fragment_id,
915                upstream_infos: vec![UpstreamSinkInfo {
916                    sink_id: SinkId::new(1),
917                    sink_fragment_id,
918                    sink_output_fields: vec![],
919                    sink_original_target_columns: vec![],
920                    project_exprs: vec![],
921                    new_sink_downstream: DownstreamFragmentRelation {
922                        downstream_fragment_id: FragmentId::new(300),
923                        dispatcher_type: DispatcherType::Hash,
924                        dist_key_indices: vec![],
925                        output_mapping: PbDispatchOutputMapping::default(),
926                    },
927                }],
928            },
929        )]);
930
931        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
932
933        let updated = inflight_jobs
934            .get(&database_id)
935            .unwrap()
936            .get(&job_id)
937            .unwrap()
938            .get(&fragment_id)
939            .unwrap();
940
941        let PbNodeBody::UpstreamSinkUnion(updated_union) =
942            updated.nodes.node_body.as_ref().unwrap()
943        else {
944            panic!("expected upstream sink union node");
945        };
946
947        assert_eq!(updated_union.init_upstreams.len(), 1);
948        assert_eq!(
949            updated_union.init_upstreams[0].upstream_fragment_id,
950            sink_fragment_id.as_raw_id()
951        );
952    }
953
954    #[test]
955    fn test_build_stream_actors_uses_preloaded_extra_info() {
956        let database_id = DatabaseId::new(2);
957        let job_id = JobId::new(20);
958        let fragment_id = FragmentId::new(120);
959        let actor_id = ActorId::new(500);
960
961        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
962        inflight_jobs
963            .entry(database_id)
964            .or_default()
965            .entry(job_id)
966            .or_default()
967            .insert(
968                fragment_id,
969                InflightFragmentInfo {
970                    fragment_id,
971                    distribution_type: DistributionType::Hash,
972                    fragment_type_mask: FragmentTypeMask::empty(),
973                    vnode_count: 1,
974                    nodes: PbStreamNode::default(),
975                    actors: HashMap::from([(
976                        actor_id,
977                        InflightActorInfo {
978                            worker_id: WorkerId::new(1),
979                            vnode_bitmap: None,
980                            splits: vec![],
981                        },
982                    )]),
983                    state_table_ids: HashSet::new(),
984                },
985            );
986
987        let job_extra_info = HashMap::from([(
988            job_id,
989            StreamingJobExtraInfo {
990                timezone: Some("UTC".to_owned()),
991                config_override: "cfg".into(),
992                job_definition: "definition".to_owned(),
993                backfill_orders: None,
994            },
995        )]);
996
997        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
998
999        let actor = stream_actors.get(&actor_id).unwrap();
1000        assert_eq!(actor.actor_id, actor_id);
1001        assert_eq!(actor.fragment_id, fragment_id);
1002        assert_eq!(actor.mview_definition, "definition");
1003        assert_eq!(&*actor.config_override, "cfg");
1004        let expr_ctx = actor.expr_context.as_ref().unwrap();
1005        assert_eq!(expr_ctx.time_zone, "UTC");
1006    }
1007}