risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::SplitImpl;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41    FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
42    render_actor_assignments,
43};
44use crate::controller::utils::StreamingJobExtraInfo;
45use crate::manager::ActiveStreamingWorkerNodes;
46use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
47use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
48use crate::stream::cdc::reload_cdc_table_snapshot_splits;
49use crate::stream::{SourceChange, StreamFragmentGraph, UpstreamSinkInfo};
50
51#[derive(Debug)]
52pub(crate) struct UpstreamSinkRecoveryInfo {
53    target_fragment_id: FragmentId,
54    upstream_infos: Vec<UpstreamSinkInfo>,
55}
56
57#[derive(Debug)]
58pub struct LoadedRecoveryContext {
59    pub fragment_context: LoadedFragmentContext,
60    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
61    pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
62    pub fragment_relations: FragmentDownstreamRelation,
63}
64
65impl LoadedRecoveryContext {
66    fn empty(fragment_context: LoadedFragmentContext) -> Self {
67        Self {
68            fragment_context,
69            job_extra_info: HashMap::new(),
70            upstream_sink_recovery: HashMap::new(),
71            fragment_relations: FragmentDownstreamRelation::default(),
72        }
73    }
74}
75
76pub struct RenderedDatabaseRuntimeInfo {
77    pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
78    pub stream_actors: HashMap<ActorId, StreamActor>,
79    pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
80}
81
82pub fn render_runtime_info(
83    actor_id_generator: &AtomicU32,
84    worker_nodes: &ActiveStreamingWorkerNodes,
85    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
86    recovery_context: &LoadedRecoveryContext,
87    database_id: DatabaseId,
88) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
89    let Some(per_database_context) = recovery_context.fragment_context.for_database(database_id)
90    else {
91        return Ok(None);
92    };
93
94    assert!(!per_database_context.is_empty());
95
96    let RenderedGraph { mut fragments, .. } = render_actor_assignments(
97        actor_id_generator,
98        worker_nodes.current(),
99        adaptive_parallelism_strategy,
100        &per_database_context,
101    )?;
102
103    let single_database = match fragments.remove(&database_id) {
104        Some(info) => info,
105        None => return Ok(None),
106    };
107
108    let mut database_map = HashMap::from([(database_id, single_database)]);
109    recovery_table_with_upstream_sinks(
110        &mut database_map,
111        &recovery_context.upstream_sink_recovery,
112    )?;
113    let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
114
115    let job_infos = database_map
116        .remove(&database_id)
117        .expect("database entry must exist");
118
119    let mut source_splits = HashMap::new();
120    for fragment_infos in job_infos.values() {
121        for fragment in fragment_infos.values() {
122            for (actor_id, info) in &fragment.actors {
123                source_splits.insert(*actor_id, info.splits.clone());
124            }
125        }
126    }
127
128    Ok(Some(RenderedDatabaseRuntimeInfo {
129        job_infos,
130        stream_actors,
131        source_splits,
132    }))
133}
134
135/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
136/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
137/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
138/// the operator. All necessary metadata must be preloaded before rendering.
139fn recovery_table_with_upstream_sinks(
140    inflight_jobs: &mut FragmentRenderMap,
141    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
142) -> MetaResult<()> {
143    if upstream_sink_recovery.is_empty() {
144        return Ok(());
145    }
146
147    let mut seen_jobs = HashSet::new();
148
149    for jobs in inflight_jobs.values_mut() {
150        for (job_id, fragments) in jobs {
151            if !seen_jobs.insert(*job_id) {
152                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
153            }
154
155            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
156                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
157                    refill_upstream_sink_union_in_table(
158                        &mut target_fragment.nodes,
159                        &recovery.upstream_infos,
160                    );
161                } else {
162                    return Err(anyhow::anyhow!(
163                        "target fragment {} not found for upstream sink recovery of job {}",
164                        recovery.target_fragment_id,
165                        job_id
166                    )
167                    .into());
168                }
169            }
170        }
171    }
172
173    Ok(())
174}
175
176/// Assembles `StreamActor` instances from rendered fragment info and job context.
177///
178/// This function combines the actor assignments from `FragmentRenderMap` with
179/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
180/// to produce the final `StreamActor` structures needed for recovery.
181fn build_stream_actors(
182    all_info: &FragmentRenderMap,
183    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
184) -> MetaResult<HashMap<ActorId, StreamActor>> {
185    let mut stream_actors = HashMap::new();
186
187    for (job_id, streaming_info) in all_info.values().flatten() {
188        let extra_info = job_extra_info
189            .get(job_id)
190            .cloned()
191            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
192        let expr_context = extra_info.stream_context().to_expr_context();
193        let job_definition = extra_info.job_definition;
194        let config_override = extra_info.config_override;
195
196        for (fragment_id, fragment_infos) in streaming_info {
197            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
198                stream_actors.insert(
199                    *actor_id,
200                    StreamActor {
201                        actor_id: *actor_id,
202                        fragment_id: *fragment_id,
203                        vnode_bitmap: vnode_bitmap.clone(),
204                        mview_definition: job_definition.clone(),
205                        expr_context: Some(expr_context.clone()),
206                        config_override: config_override.clone(),
207                    },
208                );
209            }
210        }
211    }
212    Ok(stream_actors)
213}
214
215impl GlobalBarrierWorkerContextImpl {
216    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
217    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
218        self.metadata_manager
219            .catalog_controller
220            .clean_dirty_subscription(database_id)
221            .await?;
222        let dirty_associated_source_ids = self
223            .metadata_manager
224            .catalog_controller
225            .clean_dirty_creating_jobs(database_id)
226            .await?;
227        self.metadata_manager
228            .reset_all_refresh_jobs_to_idle()
229            .await?;
230
231        // unregister cleaned sources.
232        self.source_manager
233            .apply_source_change(SourceChange::DropSource {
234                dropped_source_ids: dirty_associated_source_ids,
235            })
236            .await;
237
238        Ok(())
239    }
240
241    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
242        if let Some(database_id) = database_id {
243            let sink_ids = self
244                .metadata_manager
245                .catalog_controller
246                .list_sink_ids(Some(database_id))
247                .await?;
248            self.sink_manager.stop_sink_coordinator(sink_ids).await;
249        } else {
250            self.sink_manager.reset().await;
251        }
252        Ok(())
253    }
254
255    async fn abort_dirty_pending_sink_state(
256        &self,
257        database_id: Option<DatabaseId>,
258    ) -> MetaResult<()> {
259        let pending_sinks: HashSet<SinkId> = self
260            .metadata_manager
261            .catalog_controller
262            .list_all_pending_sinks(database_id)
263            .await?;
264
265        if pending_sinks.is_empty() {
266            return Ok(());
267        }
268
269        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
270            .metadata_manager
271            .catalog_controller
272            .fetch_sink_with_state_table_ids(pending_sinks)
273            .await?;
274
275        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
276
277        for (sink_id, table_ids) in sink_with_state_tables {
278            let Some(table_id) = table_ids.first() else {
279                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
280            };
281
282            self.hummock_manager
283                .on_current_version(|version| -> MetaResult<()> {
284                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
285                        assert!(
286                            sink_committed_epoch
287                                .insert(sink_id, committed_epoch)
288                                .is_none()
289                        );
290                        Ok(())
291                    } else {
292                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
293                    }
294                })
295                .await?;
296        }
297
298        self.metadata_manager
299            .catalog_controller
300            .abort_pending_sink_epochs(sink_committed_epoch)
301            .await?;
302
303        Ok(())
304    }
305
306    async fn purge_state_table_from_hummock(
307        &self,
308        all_state_table_ids: &HashSet<TableId>,
309    ) -> MetaResult<()> {
310        self.hummock_manager.purge(all_state_table_ids).await?;
311        Ok(())
312    }
313
314    async fn list_background_job_progress(
315        &self,
316        database_id: Option<DatabaseId>,
317    ) -> MetaResult<HashSet<JobId>> {
318        let mgr = &self.metadata_manager;
319        mgr.catalog_controller
320            .list_background_creating_jobs(false, database_id)
321            .await
322    }
323
324    async fn load_recovery_context(
325        &self,
326        database_id: Option<DatabaseId>,
327    ) -> MetaResult<LoadedRecoveryContext> {
328        let inner = self
329            .metadata_manager
330            .catalog_controller
331            .get_inner_read_guard()
332            .await;
333        let txn = inner.db.begin().await?;
334
335        let fragment_context = self
336            .metadata_manager
337            .catalog_controller
338            .load_fragment_context_in_txn(&txn, database_id)
339            .await
340            .inspect_err(|err| {
341                warn!(error = %err.as_report(), "load fragment context failed");
342            })?;
343
344        if fragment_context.is_empty() {
345            return Ok(LoadedRecoveryContext::empty(fragment_context));
346        }
347
348        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
349        let job_extra_info = self
350            .metadata_manager
351            .catalog_controller
352            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
353            .await?;
354
355        let mut upstream_targets = HashMap::new();
356        for fragment in fragment_context
357            .job_fragments
358            .values()
359            .flat_map(|fragments| fragments.values())
360        {
361            let mut has_upstream_union = false;
362            visit_stream_node_cont(&fragment.nodes, |node| {
363                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
364                    has_upstream_union = true;
365                    false
366                } else {
367                    true
368                }
369            });
370
371            if has_upstream_union
372                && let Some(previous) =
373                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
374            {
375                bail!(
376                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
377                    fragment.job_id,
378                    fragment.fragment_id,
379                    previous
380                );
381            }
382        }
383
384        let mut upstream_sink_recovery = HashMap::new();
385        if !upstream_targets.is_empty() {
386            let tables = self
387                .metadata_manager
388                .catalog_controller
389                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
390                .await?;
391
392            for table in tables {
393                let job_id = table.id.as_job_id();
394                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
395                    // This should not happen unless catalog changes or legacy metadata are involved.
396                    tracing::debug!(
397                        job_id = %job_id,
398                        "upstream sink union target fragment not found for table"
399                    );
400                    continue;
401                };
402
403                let upstream_infos = self
404                    .metadata_manager
405                    .catalog_controller
406                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
407                    .await?;
408
409                upstream_sink_recovery.insert(
410                    job_id,
411                    UpstreamSinkRecoveryInfo {
412                        target_fragment_id: *target_fragment_id,
413                        upstream_infos,
414                    },
415                );
416            }
417        }
418
419        let fragment_relations = self
420            .metadata_manager
421            .catalog_controller
422            .get_fragment_downstream_relations_in_txn(
423                &txn,
424                fragment_context
425                    .job_fragments
426                    .values()
427                    .flat_map(|fragments| fragments.keys().copied())
428                    .collect_vec(),
429            )
430            .await?;
431
432        Ok(LoadedRecoveryContext {
433            fragment_context,
434            job_extra_info,
435            upstream_sink_recovery,
436            fragment_relations,
437        })
438    }
439
440    #[expect(clippy::type_complexity)]
441    fn resolve_hummock_version_epochs(
442        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
443        version: &HummockVersion,
444    ) -> MetaResult<(
445        HashMap<TableId, u64>,
446        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
447    )> {
448        let table_committed_epoch: HashMap<_, _> = version
449            .state_table_info
450            .info()
451            .iter()
452            .map(|(table_id, info)| (*table_id, info.committed_epoch))
453            .collect();
454        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
455            Ok(*table_committed_epoch
456                .get(&table_id)
457                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
458        };
459        let mut min_downstream_committed_epochs = HashMap::new();
460        for (job_id, fragments) in background_jobs {
461            let job_committed_epoch = {
462                let mut table_id_iter = fragments
463                    .values()
464                    .flat_map(|fragment| fragment.state_table_ids.iter().copied());
465                let Some(first_table_id) = table_id_iter.next() else {
466                    bail!("job {} has no state table", job_id);
467                };
468                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
469                for table_id in table_id_iter {
470                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
471                    if job_committed_epoch != table_committed_epoch {
472                        bail!(
473                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
474                            first_table_id,
475                            job_committed_epoch,
476                            table_id,
477                            table_committed_epoch,
478                            job_id
479                        );
480                    }
481                }
482
483                job_committed_epoch
484            };
485            if let (Some(snapshot_backfill_info), _) =
486                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
487                    fragments
488                        .values()
489                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
490                )?
491            {
492                for (upstream_table, snapshot_epoch) in
493                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
494                {
495                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
496                        anyhow!(
497                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
498                            job_id, upstream_table
499                        )
500                    })?;
501                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
502                    match min_downstream_committed_epochs.entry(upstream_table) {
503                        Entry::Occupied(entry) => {
504                            let prev_min_epoch = entry.into_mut();
505                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
506                        }
507                        Entry::Vacant(entry) => {
508                            entry.insert(pinned_epoch);
509                        }
510                    }
511                }
512            }
513        }
514        let mut log_epochs = HashMap::new();
515        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
516            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
517            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
518                Ordering::Less => {
519                    bail!(
520                        "downstream epoch {} later than upstream epoch {} of table {}",
521                        downstream_committed_epoch,
522                        upstream_committed_epoch,
523                        upstream_table_id
524                    );
525                }
526                Ordering::Equal => {
527                    continue;
528                }
529                Ordering::Greater => {
530                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
531                    {
532                        let epochs = table_change_log
533                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
534                            .map(|epoch_log| {
535                                (
536                                    epoch_log.non_checkpoint_epochs.clone(),
537                                    epoch_log.checkpoint_epoch,
538                                )
539                            })
540                            .collect_vec();
541                        let first_epochs = epochs.first();
542                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
543                            && *first_checkpoint_epoch == downstream_committed_epoch
544                        {
545                        } else {
546                            bail!(
547                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
548                                epochs,
549                                upstream_table_id,
550                                downstream_committed_epoch
551                            );
552                        }
553                        log_epochs
554                            .try_insert(upstream_table_id, epochs)
555                            .expect("non-duplicated");
556                    } else {
557                        bail!(
558                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
559                            upstream_table_id,
560                            upstream_committed_epoch,
561                            downstream_committed_epoch
562                        );
563                    }
564                }
565            }
566        }
567        Ok((table_committed_epoch, log_epochs))
568    }
569
570    pub(super) async fn reload_runtime_info_impl(
571        &self,
572    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
573        {
574            {
575                {
576                    self.clean_dirty_streaming_jobs(None)
577                        .await
578                        .context("clean dirty streaming jobs")?;
579
580                    self.reset_sink_coordinator(None)
581                        .await
582                        .context("reset sink coordinator")?;
583                    self.abort_dirty_pending_sink_state(None)
584                        .await
585                        .context("abort dirty pending sink state")?;
586
587                    // Background job progress needs to be recovered.
588                    tracing::info!("recovering background job progress");
589                    let initial_background_jobs = self
590                        .list_background_job_progress(None)
591                        .await
592                        .context("recover background job progress should not fail")?;
593
594                    tracing::info!("recovered background job progress");
595
596                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
597                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
598                    self.metadata_manager
599                        .catalog_controller
600                        .cleanup_dropped_tables()
601                        .await;
602
603                    let active_streaming_nodes =
604                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
605                            .await?;
606
607                    let background_streaming_jobs =
608                        initial_background_jobs.iter().cloned().collect_vec();
609
610                    tracing::info!(
611                        "background streaming jobs: {:?} total {}",
612                        background_streaming_jobs,
613                        background_streaming_jobs.len()
614                    );
615
616                    let unreschedulable_jobs = {
617                        let mut unreschedulable_jobs = HashSet::new();
618
619                        for job_id in background_streaming_jobs {
620                            let scan_types = self
621                                .metadata_manager
622                                .get_job_backfill_scan_types(job_id)
623                                .await?;
624
625                            if scan_types
626                                .values()
627                                .any(|scan_type| !scan_type.is_reschedulable(false))
628                            {
629                                unreschedulable_jobs.insert(job_id);
630                            }
631                        }
632
633                        unreschedulable_jobs
634                    };
635
636                    if !unreschedulable_jobs.is_empty() {
637                        info!(
638                            "unreschedulable background jobs: {:?}",
639                            unreschedulable_jobs
640                        );
641                    }
642
643                    // Resolve actor info for recovery. If there's no actor to recover, most of the
644                    // following steps will be no-op, while the compute nodes will still be reset.
645                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
646                    if !unreschedulable_jobs.is_empty() {
647                        bail!(
648                            "Recovery for unreschedulable background jobs is not yet implemented. \
649                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
650                            unreschedulable_jobs
651                        );
652                    }
653
654                    let mut recovery_context = self.load_recovery_context(None).await?;
655                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
656                    if !dropped_table_ids.is_empty() {
657                        self.metadata_manager
658                            .catalog_controller
659                            .complete_dropped_tables(dropped_table_ids)
660                            .await;
661                        recovery_context = self.load_recovery_context(None).await?;
662                    }
663
664                    self.purge_state_table_from_hummock(
665                        &recovery_context
666                            .fragment_context
667                            .job_fragments
668                            .values()
669                            .flat_map(|fragments| fragments.values())
670                            .flat_map(|fragment| fragment.state_table_ids.iter().copied())
671                            .collect(),
672                    )
673                    .await
674                    .context("purge state table from hummock")?;
675
676                    let (state_table_committed_epochs, state_table_log_epochs) = self
677                        .hummock_manager
678                        .on_current_version(|version| {
679                            Self::resolve_hummock_version_epochs(
680                                recovery_context
681                                    .fragment_context
682                                    .job_fragments
683                                    .iter()
684                                    .filter_map(|(job_id, job)| {
685                                        initial_background_jobs
686                                            .contains(job_id)
687                                            .then_some((*job_id, job))
688                                    }),
689                                version,
690                            )
691                        })
692                        .await?;
693
694                    let mv_depended_subscriptions = self
695                        .metadata_manager
696                        .get_mv_depended_subscriptions(None)
697                        .await?;
698
699                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
700                    let background_jobs = {
701                        let mut refreshed_background_jobs = self
702                            .list_background_job_progress(None)
703                            .await
704                            .context("recover background job progress should not fail")?;
705                        recovery_context
706                            .fragment_context
707                            .job_map
708                            .keys()
709                            .filter_map(|job_id| {
710                                refreshed_background_jobs.remove(job_id).then_some(*job_id)
711                            })
712                            .collect()
713                    };
714
715                    let database_infos = self
716                        .metadata_manager
717                        .catalog_controller
718                        .list_databases()
719                        .await?;
720
721                    let cdc_table_snapshot_splits =
722                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
723                            .await?;
724
725                    Ok(BarrierWorkerRuntimeInfoSnapshot {
726                        active_streaming_nodes,
727                        recovery_context,
728                        state_table_committed_epochs,
729                        state_table_log_epochs,
730                        mv_depended_subscriptions,
731                        background_jobs,
732                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
733                        database_infos,
734                        cdc_table_snapshot_splits,
735                    })
736                }
737            }
738        }
739    }
740
741    pub(super) async fn reload_database_runtime_info_impl(
742        &self,
743        database_id: DatabaseId,
744    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
745        self.clean_dirty_streaming_jobs(Some(database_id))
746            .await
747            .context("clean dirty streaming jobs")?;
748
749        self.reset_sink_coordinator(Some(database_id))
750            .await
751            .context("reset sink coordinator")?;
752        self.abort_dirty_pending_sink_state(Some(database_id))
753            .await
754            .context("abort dirty pending sink state")?;
755
756        // Background job progress needs to be recovered.
757        tracing::info!(
758            ?database_id,
759            "recovering background job progress of database"
760        );
761
762        let background_jobs = self
763            .list_background_job_progress(Some(database_id))
764            .await
765            .context("recover background job progress of database should not fail")?;
766        tracing::info!(?database_id, "recovered background job progress");
767
768        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
769        let dropped_table_ids = self
770            .scheduled_barriers
771            .pre_apply_drop_cancel(Some(database_id));
772        self.metadata_manager
773            .catalog_controller
774            .complete_dropped_tables(dropped_table_ids)
775            .await;
776
777        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
778
779        let missing_background_jobs = background_jobs
780            .iter()
781            .filter(|job_id| {
782                !recovery_context
783                    .fragment_context
784                    .job_map
785                    .contains_key(*job_id)
786            })
787            .copied()
788            .collect_vec();
789        if !missing_background_jobs.is_empty() {
790            warn!(
791                database_id = %database_id,
792                missing_job_ids = ?missing_background_jobs,
793                "background jobs missing in rendered info"
794            );
795        }
796
797        let (state_table_committed_epochs, state_table_log_epochs) = self
798            .hummock_manager
799            .on_current_version(|version| {
800                Self::resolve_hummock_version_epochs(
801                    background_jobs.iter().filter_map(|job_id| {
802                        recovery_context
803                            .fragment_context
804                            .job_fragments
805                            .get(job_id)
806                            .map(|job| (*job_id, job))
807                    }),
808                    version,
809                )
810            })
811            .await?;
812
813        let mv_depended_subscriptions = self
814            .metadata_manager
815            .get_mv_depended_subscriptions(Some(database_id))
816            .await?;
817
818        let cdc_table_snapshot_splits =
819            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
820                .await?;
821
822        self.refresh_manager
823            .remove_trackers_by_database(database_id);
824
825        Ok(DatabaseRuntimeInfoSnapshot {
826            recovery_context,
827            state_table_committed_epochs,
828            state_table_log_epochs,
829            mv_depended_subscriptions,
830            background_jobs,
831            cdc_table_snapshot_splits,
832        })
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use std::collections::HashMap;
839
840    use risingwave_common::catalog::FragmentTypeMask;
841    use risingwave_common::id::WorkerId;
842    use risingwave_meta_model::DispatcherType;
843    use risingwave_meta_model::fragment::DistributionType;
844    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
845    use risingwave_pb::stream_plan::{
846        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
847    };
848
849    use super::*;
850    use crate::controller::fragment::InflightActorInfo;
851    use crate::model::DownstreamFragmentRelation;
852    use crate::stream::UpstreamSinkInfo;
853
854    #[test]
855    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
856        let database_id = DatabaseId::new(1);
857        let job_id = JobId::new(10);
858        let fragment_id = FragmentId::new(100);
859        let sink_fragment_id = FragmentId::new(200);
860
861        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
862        let fragment = InflightFragmentInfo {
863            fragment_id,
864            distribution_type: DistributionType::Hash,
865            fragment_type_mask: FragmentTypeMask::empty(),
866            vnode_count: 1,
867            nodes: PbStreamNode {
868                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
869                    PbUpstreamSinkUnionNode {
870                        init_upstreams: vec![],
871                    },
872                ))),
873                ..Default::default()
874            },
875            actors: HashMap::new(),
876            state_table_ids: HashSet::new(),
877        };
878
879        inflight_jobs
880            .entry(database_id)
881            .or_default()
882            .entry(job_id)
883            .or_default()
884            .insert(fragment_id, fragment);
885
886        let upstream_sink_recovery = HashMap::from([(
887            job_id,
888            UpstreamSinkRecoveryInfo {
889                target_fragment_id: fragment_id,
890                upstream_infos: vec![UpstreamSinkInfo {
891                    sink_id: SinkId::new(1),
892                    sink_fragment_id,
893                    sink_output_fields: vec![],
894                    sink_original_target_columns: vec![],
895                    project_exprs: vec![],
896                    new_sink_downstream: DownstreamFragmentRelation {
897                        downstream_fragment_id: FragmentId::new(300),
898                        dispatcher_type: DispatcherType::Hash,
899                        dist_key_indices: vec![],
900                        output_mapping: PbDispatchOutputMapping::default(),
901                    },
902                }],
903            },
904        )]);
905
906        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
907
908        let updated = inflight_jobs
909            .get(&database_id)
910            .unwrap()
911            .get(&job_id)
912            .unwrap()
913            .get(&fragment_id)
914            .unwrap();
915
916        let PbNodeBody::UpstreamSinkUnion(updated_union) =
917            updated.nodes.node_body.as_ref().unwrap()
918        else {
919            panic!("expected upstream sink union node");
920        };
921
922        assert_eq!(updated_union.init_upstreams.len(), 1);
923        assert_eq!(
924            updated_union.init_upstreams[0].upstream_fragment_id,
925            sink_fragment_id.as_raw_id()
926        );
927    }
928
929    #[test]
930    fn test_build_stream_actors_uses_preloaded_extra_info() {
931        let database_id = DatabaseId::new(2);
932        let job_id = JobId::new(20);
933        let fragment_id = FragmentId::new(120);
934        let actor_id = ActorId::new(500);
935
936        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
937        inflight_jobs
938            .entry(database_id)
939            .or_default()
940            .entry(job_id)
941            .or_default()
942            .insert(
943                fragment_id,
944                InflightFragmentInfo {
945                    fragment_id,
946                    distribution_type: DistributionType::Hash,
947                    fragment_type_mask: FragmentTypeMask::empty(),
948                    vnode_count: 1,
949                    nodes: PbStreamNode::default(),
950                    actors: HashMap::from([(
951                        actor_id,
952                        InflightActorInfo {
953                            worker_id: WorkerId::new(1),
954                            vnode_bitmap: None,
955                            splits: vec![],
956                        },
957                    )]),
958                    state_table_ids: HashSet::new(),
959                },
960            );
961
962        let job_extra_info = HashMap::from([(
963            job_id,
964            StreamingJobExtraInfo {
965                timezone: Some("UTC".to_owned()),
966                config_override: "cfg".into(),
967                adaptive_parallelism_strategy: None,
968                job_definition: "definition".to_owned(),
969                backfill_orders: None,
970            },
971        )]);
972
973        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
974
975        let actor = stream_actors.get(&actor_id).unwrap();
976        assert_eq!(actor.actor_id, actor_id);
977        assert_eq!(actor.fragment_id, fragment_id);
978        assert_eq!(actor.mview_definition, "definition");
979        assert_eq!(&*actor.config_override, "cfg");
980        let expr_ctx = actor.expr_context.as_ref().unwrap();
981        assert_eq!(expr_ctx.time_zone, "UTC");
982    }
983}