risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::TableChangeLogs;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::checkpoint::{
39    BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, BatchRefreshRenderResult,
40};
41use crate::barrier::context::GlobalBarrierWorkerContextImpl;
42use crate::barrier::rpc::to_partial_graph_id;
43use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
44use crate::controller::scale::{
45    FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
46    render_actor_assignments,
47};
48use crate::controller::utils::StreamingJobExtraInfo;
49use crate::manager::ActiveStreamingWorkerNodes;
50use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
51use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
52use crate::stream::cdc::reload_cdc_table_snapshot_splits;
53use crate::stream::{
54    SourceChange, StreamFragmentGraph, UpstreamSinkInfo, cleanup_dropped_streaming_jobs,
55};
56
57#[derive(Debug)]
58pub(crate) struct UpstreamSinkRecoveryInfo {
59    target_fragment_id: FragmentId,
60    upstream_infos: Vec<UpstreamSinkInfo>,
61}
62
63#[derive(Debug)]
64pub struct LoadedRecoveryContext {
65    pub fragment_context: LoadedFragmentContext,
66    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
67    pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
68    pub fragment_relations: FragmentDownstreamRelation,
69}
70
71impl LoadedRecoveryContext {
72    fn empty(fragment_context: LoadedFragmentContext) -> Self {
73        Self {
74            fragment_context,
75            job_extra_info: HashMap::new(),
76            upstream_sink_recovery: HashMap::new(),
77            fragment_relations: FragmentDownstreamRelation::default(),
78        }
79    }
80}
81
82pub struct RenderedDatabaseRuntimeInfo {
83    pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
84    pub stream_actors: HashMap<ActorId, StreamActor>,
85    pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
86    /// Batch refresh jobs rendered during `render_runtime_info`.
87    pub batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
88}
89
90pub fn render_runtime_info(
91    actor_id_generator: &AtomicU32,
92    worker_nodes: &ActiveStreamingWorkerNodes,
93    recovery_context: &LoadedRecoveryContext,
94    database_id: DatabaseId,
95) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
96    let Some(mut per_database_context) =
97        recovery_context.fragment_context.for_database(database_id)
98    else {
99        return Ok(None);
100    };
101
102    assert!(!per_database_context.is_empty());
103
104    // Extract batch refresh jobs before rendering via `render_actor_assignments`.
105    // They will be rendered independently using the unified `render_actors_and_build_job_info`.
106    let batch_refresh_job_ids: HashSet<JobId> = per_database_context
107        .job_map
108        .iter()
109        .filter(|(_, model)| model.refresh_interval_sec.is_some())
110        .map(|(job_id, _)| *job_id)
111        .collect();
112
113    let mut batch_refresh_logical = HashMap::new();
114    if !batch_refresh_job_ids.is_empty() {
115        let batch_refresh_fragment_ids: HashSet<FragmentId> = batch_refresh_job_ids
116            .iter()
117            .flat_map(|job_id| {
118                per_database_context
119                    .job_fragments
120                    .get(job_id)
121                    .unwrap()
122                    .keys()
123                    .copied()
124            })
125            .collect();
126
127        for &job_id in &batch_refresh_job_ids {
128            let fragments = per_database_context.job_fragments.remove(&job_id).unwrap();
129            let downstreams = fragments
130                .keys()
131                .filter_map(|fid| {
132                    recovery_context
133                        .fragment_relations
134                        .get(fid)
135                        .map(|r| (*fid, r.clone()))
136                })
137                .collect();
138            batch_refresh_logical.insert(
139                job_id,
140                BatchRefreshLogicalFragments {
141                    fragments,
142                    downstreams,
143                },
144            );
145        }
146
147        // Remove ensembles that only contain batch refresh fragments.
148        per_database_context.ensembles.retain(|ensemble| {
149            !ensemble
150                .component_fragments()
151                .all(|fid| batch_refresh_fragment_ids.contains(&fid))
152        });
153    }
154
155    // Render actors for each batch refresh job.
156    let mut batch_refresh = HashMap::new();
157    for (job_id, logical) in batch_refresh_logical {
158        let extra = recovery_context
159            .job_extra_info
160            .get(&job_id)
161            .expect("should have extra info");
162        let streaming_job_model = per_database_context
163            .job_map
164            .get(&job_id)
165            .expect("should have streaming job model");
166        let database_model = &per_database_context.database_map[&database_id];
167        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
168
169        let render_result = BatchRefreshJobCheckpointControl::render_actors_and_build_job_info(
170            &logical.fragments,
171            &logical.downstreams,
172            &extra.job_definition,
173            actor_id_generator,
174            worker_nodes.current(),
175            &database_model.resource_group,
176            streaming_job_model,
177            partial_graph_id,
178        )?;
179
180        batch_refresh.insert(job_id, render_result);
181    }
182
183    // If all fragments were batch refresh, no normal rendering needed.
184    if per_database_context.ensembles.is_empty() {
185        return Ok(Some(RenderedDatabaseRuntimeInfo {
186            job_infos: HashMap::new(),
187            stream_actors: HashMap::new(),
188            source_splits: HashMap::new(),
189            batch_refresh,
190        }));
191    }
192
193    let RenderedGraph { mut fragments, .. } = render_actor_assignments(
194        actor_id_generator,
195        worker_nodes.current(),
196        &per_database_context,
197    )?;
198
199    let single_database = match fragments.remove(&database_id) {
200        Some(info) => info,
201        None => return Ok(None),
202    };
203
204    let mut database_map = HashMap::from([(database_id, single_database)]);
205    recovery_table_with_upstream_sinks(
206        &mut database_map,
207        &recovery_context.upstream_sink_recovery,
208    )?;
209    let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
210
211    let job_infos = database_map
212        .remove(&database_id)
213        .expect("database entry must exist");
214
215    let mut source_splits = HashMap::new();
216    for fragment_infos in job_infos.values() {
217        for fragment in fragment_infos.values() {
218            for (actor_id, info) in &fragment.actors {
219                source_splits.insert(*actor_id, info.splits.clone());
220            }
221        }
222    }
223
224    Ok(Some(RenderedDatabaseRuntimeInfo {
225        job_infos,
226        stream_actors,
227        source_splits,
228        batch_refresh,
229    }))
230}
231
232/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
233/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
234/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
235/// the operator. All necessary metadata must be preloaded before rendering.
236fn recovery_table_with_upstream_sinks(
237    inflight_jobs: &mut FragmentRenderMap,
238    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
239) -> MetaResult<()> {
240    if upstream_sink_recovery.is_empty() {
241        return Ok(());
242    }
243
244    let mut seen_jobs = HashSet::new();
245
246    for jobs in inflight_jobs.values_mut() {
247        for (job_id, fragments) in jobs {
248            if !seen_jobs.insert(*job_id) {
249                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
250            }
251
252            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
253                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
254                    refill_upstream_sink_union_in_table(
255                        &mut target_fragment.nodes,
256                        &recovery.upstream_infos,
257                    );
258                } else {
259                    return Err(anyhow::anyhow!(
260                        "target fragment {} not found for upstream sink recovery of job {}",
261                        recovery.target_fragment_id,
262                        job_id
263                    )
264                    .into());
265                }
266            }
267        }
268    }
269
270    Ok(())
271}
272
273/// Assembles `StreamActor` instances from rendered fragment info and job context.
274///
275/// This function combines the actor assignments from `FragmentRenderMap` with
276/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
277/// to produce the final `StreamActor` structures needed for recovery.
278fn build_stream_actors(
279    all_info: &FragmentRenderMap,
280    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
281) -> MetaResult<HashMap<ActorId, StreamActor>> {
282    let mut stream_actors = HashMap::new();
283
284    for (job_id, streaming_info) in all_info.values().flatten() {
285        let extra_info = job_extra_info
286            .get(job_id)
287            .cloned()
288            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
289        let expr_context = extra_info.stream_context().to_expr_context();
290        let job_definition = extra_info.job_definition;
291        let config_override = extra_info.config_override;
292
293        for (fragment_id, fragment_infos) in streaming_info {
294            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
295                stream_actors.insert(
296                    *actor_id,
297                    StreamActor {
298                        actor_id: *actor_id,
299                        fragment_id: *fragment_id,
300                        vnode_bitmap: vnode_bitmap.clone(),
301                        mview_definition: job_definition.clone(),
302                        expr_context: Some(expr_context.clone()),
303                        config_override: config_override.clone(),
304                    },
305                );
306            }
307        }
308    }
309    Ok(stream_actors)
310}
311
312impl GlobalBarrierWorkerContextImpl {
313    async fn apply_pre_applied_drop_cancel(
314        &self,
315        database_id: Option<DatabaseId>,
316    ) -> MetaResult<bool> {
317        let drop_cancel = self.scheduled_barriers.pre_apply_drop_cancel(database_id);
318        let has_drop_streaming_jobs = !drop_cancel.streaming_job_ids.is_empty();
319        cleanup_dropped_streaming_jobs(
320            &self.refresh_manager,
321            &self.hummock_manager,
322            &self.metadata_manager,
323            drop_cancel.streaming_job_ids,
324            drop_cancel.dropped_state_table_ids,
325            "drop_streaming_jobs",
326        )
327        .await?;
328        Ok(has_drop_streaming_jobs)
329    }
330
331    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
332    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
333        self.metadata_manager
334            .catalog_controller
335            .clean_dirty_subscription(database_id)
336            .await?;
337
338        let cleaned_dirty_jobs = self
339            .metadata_manager
340            .catalog_controller
341            .clean_dirty_creating_jobs(database_id)
342            .await?;
343        if database_id.is_some() {
344            // Per-database recovery does not run the global Hummock purge below. Unregister the
345            // dirty jobs cleaned in this database through the normal dropped-table cleanup path.
346            cleanup_dropped_streaming_jobs(
347                &self.refresh_manager,
348                &self.hummock_manager,
349                &self.metadata_manager,
350                cleaned_dirty_jobs.streaming_job_ids,
351                cleaned_dirty_jobs.dropped_table_ids,
352                "clean_dirty_creating_jobs",
353            )
354            .await?;
355        }
356        self.metadata_manager
357            .reset_all_refresh_jobs_to_idle()
358            .await?;
359
360        // unregister cleaned sources.
361        self.source_manager
362            .apply_source_change(SourceChange::DropSource {
363                dropped_source_ids: cleaned_dirty_jobs.source_ids,
364            })
365            .await;
366
367        Ok(())
368    }
369
370    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
371        if let Some(database_id) = database_id {
372            let sink_ids = self
373                .metadata_manager
374                .catalog_controller
375                .list_sink_ids(Some(database_id))
376                .await?;
377            self.sink_manager.stop_sink_coordinator(sink_ids).await;
378        } else {
379            self.sink_manager.reset().await;
380        }
381        Ok(())
382    }
383
384    async fn abort_dirty_pending_sink_state(
385        &self,
386        database_id: Option<DatabaseId>,
387    ) -> MetaResult<()> {
388        let pending_sinks: HashSet<SinkId> = self
389            .metadata_manager
390            .catalog_controller
391            .list_all_pending_sinks(database_id)
392            .await?;
393
394        if pending_sinks.is_empty() {
395            return Ok(());
396        }
397
398        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
399            .metadata_manager
400            .catalog_controller
401            .fetch_sink_with_state_table_ids(pending_sinks)
402            .await?;
403
404        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
405
406        for (sink_id, table_ids) in sink_with_state_tables {
407            let Some(table_id) = table_ids.first() else {
408                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
409            };
410
411            self.hummock_manager
412                .on_current_version(|version| -> MetaResult<()> {
413                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
414                        assert!(
415                            sink_committed_epoch
416                                .insert(sink_id, committed_epoch)
417                                .is_none()
418                        );
419                        Ok(())
420                    } else {
421                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
422                    }
423                })
424                .await?;
425        }
426
427        self.metadata_manager
428            .catalog_controller
429            .abort_pending_sink_epochs(sink_committed_epoch)
430            .await?;
431
432        Ok(())
433    }
434
435    async fn purge_state_table_from_hummock(
436        &self,
437        all_state_table_ids: &HashSet<TableId>,
438    ) -> MetaResult<()> {
439        self.hummock_manager.purge(all_state_table_ids).await?;
440        Ok(())
441    }
442
443    async fn list_background_job_progress(
444        &self,
445        database_id: Option<DatabaseId>,
446    ) -> MetaResult<HashSet<JobId>> {
447        let mgr = &self.metadata_manager;
448        mgr.catalog_controller
449            .list_background_creating_jobs(false, database_id)
450            .await
451    }
452
453    async fn load_recovery_context(
454        &self,
455        database_id: Option<DatabaseId>,
456    ) -> MetaResult<LoadedRecoveryContext> {
457        let inner = self
458            .metadata_manager
459            .catalog_controller
460            .get_inner_read_guard()
461            .await;
462        let txn = inner.db.begin().await?;
463
464        let fragment_context = self
465            .metadata_manager
466            .catalog_controller
467            .load_fragment_context_in_txn(&txn, database_id)
468            .await
469            .inspect_err(|err| {
470                warn!(error = %err.as_report(), "load fragment context failed");
471            })?;
472
473        if fragment_context.is_empty() {
474            return Ok(LoadedRecoveryContext::empty(fragment_context));
475        }
476
477        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
478        let job_extra_info = self
479            .metadata_manager
480            .catalog_controller
481            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
482            .await?;
483
484        let mut upstream_targets = HashMap::new();
485        for fragment in fragment_context
486            .job_fragments
487            .values()
488            .flat_map(|fragments| fragments.values())
489        {
490            let mut has_upstream_union = false;
491            visit_stream_node_cont(&fragment.nodes, |node| {
492                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
493                    has_upstream_union = true;
494                    false
495                } else {
496                    true
497                }
498            });
499
500            if has_upstream_union
501                && let Some(previous) =
502                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
503            {
504                bail!(
505                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
506                    fragment.job_id,
507                    fragment.fragment_id,
508                    previous
509                );
510            }
511        }
512
513        let mut upstream_sink_recovery = HashMap::new();
514        if !upstream_targets.is_empty() {
515            let tables = self
516                .metadata_manager
517                .catalog_controller
518                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
519                .await?;
520
521            for table in tables {
522                let job_id = table.id.as_job_id();
523                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
524                    // This should not happen unless catalog changes or legacy metadata are involved.
525                    tracing::debug!(
526                        job_id = %job_id,
527                        "upstream sink union target fragment not found for table"
528                    );
529                    continue;
530                };
531
532                let upstream_infos = self
533                    .metadata_manager
534                    .catalog_controller
535                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
536                    .await?;
537
538                upstream_sink_recovery.insert(
539                    job_id,
540                    UpstreamSinkRecoveryInfo {
541                        target_fragment_id: *target_fragment_id,
542                        upstream_infos,
543                    },
544                );
545            }
546        }
547
548        let fragment_relations = self
549            .metadata_manager
550            .catalog_controller
551            .get_fragment_downstream_relations_in_txn(
552                &txn,
553                fragment_context
554                    .job_fragments
555                    .values()
556                    .flat_map(|fragments| fragments.keys().copied())
557                    .collect_vec(),
558            )
559            .await?;
560
561        Ok(LoadedRecoveryContext {
562            fragment_context,
563            job_extra_info,
564            upstream_sink_recovery,
565            fragment_relations,
566        })
567    }
568
569    #[expect(clippy::type_complexity)]
570    fn resolve_hummock_version_epochs(
571        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
572        version: &HummockVersion,
573        table_change_log: &TableChangeLogs,
574    ) -> MetaResult<(
575        HashMap<TableId, u64>,
576        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
577    )> {
578        let table_committed_epoch: HashMap<_, _> = version
579            .state_table_info
580            .info()
581            .iter()
582            .map(|(table_id, info)| (*table_id, info.committed_epoch))
583            .collect();
584        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
585            Ok(*table_committed_epoch
586                .get(&table_id)
587                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
588        };
589        let mut min_downstream_committed_epochs = HashMap::new();
590        for (job_id, fragments) in background_jobs {
591            let job_committed_epoch = {
592                let mut table_id_iter = fragments
593                    .values()
594                    .flat_map(|fragment| fragment.state_table_ids.iter().copied());
595                let Some(first_table_id) = table_id_iter.next() else {
596                    bail!("job {} has no state table", job_id);
597                };
598                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
599                for table_id in table_id_iter {
600                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
601                    if job_committed_epoch != table_committed_epoch {
602                        bail!(
603                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
604                            first_table_id,
605                            job_committed_epoch,
606                            table_id,
607                            table_committed_epoch,
608                            job_id
609                        );
610                    }
611                }
612
613                job_committed_epoch
614            };
615            if let (Some(snapshot_backfill_info), _) =
616                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
617                    fragments
618                        .values()
619                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
620                )?
621            {
622                for (upstream_table, snapshot_epoch) in
623                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
624                {
625                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
626                        anyhow!(
627                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
628                            job_id, upstream_table
629                        )
630                    })?;
631                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
632                    match min_downstream_committed_epochs.entry(upstream_table) {
633                        Entry::Occupied(entry) => {
634                            let prev_min_epoch = entry.into_mut();
635                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
636                        }
637                        Entry::Vacant(entry) => {
638                            entry.insert(pinned_epoch);
639                        }
640                    }
641                }
642            }
643        }
644        let mut log_epochs = HashMap::new();
645        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
646            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
647            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
648                Ordering::Less => {
649                    bail!(
650                        "downstream epoch {} later than upstream epoch {} of table {}",
651                        downstream_committed_epoch,
652                        upstream_committed_epoch,
653                        upstream_table_id
654                    );
655                }
656                Ordering::Equal => {
657                    continue;
658                }
659                Ordering::Greater => {
660                    if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
661                        let epochs = table_change_log
662                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
663                            .map(|epoch_log| {
664                                (
665                                    epoch_log.non_checkpoint_epochs.clone(),
666                                    epoch_log.checkpoint_epoch,
667                                )
668                            })
669                            .collect_vec();
670                        let first_epochs = epochs.first();
671                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
672                            && *first_checkpoint_epoch == downstream_committed_epoch
673                        {
674                        } else {
675                            bail!(
676                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
677                                epochs,
678                                upstream_table_id,
679                                downstream_committed_epoch
680                            );
681                        }
682                        log_epochs
683                            .try_insert(upstream_table_id, epochs)
684                            .expect("non-duplicated");
685                    } else {
686                        bail!(
687                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
688                            upstream_table_id,
689                            upstream_committed_epoch,
690                            downstream_committed_epoch
691                        );
692                    }
693                }
694            }
695        }
696        Ok((table_committed_epoch, log_epochs))
697    }
698
699    pub(super) async fn reload_runtime_info_impl(
700        &self,
701    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
702        {
703            {
704                {
705                    self.clean_dirty_streaming_jobs(None)
706                        .await
707                        .context("clean dirty streaming jobs")?;
708
709                    self.reset_sink_coordinator(None)
710                        .await
711                        .context("reset sink coordinator")?;
712                    self.abort_dirty_pending_sink_state(None)
713                        .await
714                        .context("abort dirty pending sink state")?;
715
716                    // Background job progress needs to be recovered.
717                    tracing::info!("recovering background job progress");
718                    let initial_background_jobs = self
719                        .list_background_job_progress(None)
720                        .await
721                        .context("recover background job progress should not fail")?;
722
723                    tracing::info!("recovered background job progress");
724
725                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
726                    let _ = self.apply_pre_applied_drop_cancel(None).await?;
727                    self.metadata_manager
728                        .catalog_controller
729                        .cleanup_dropped_tables()
730                        .await;
731
732                    let active_streaming_nodes =
733                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
734                            .await?;
735
736                    let background_streaming_jobs =
737                        initial_background_jobs.iter().cloned().collect_vec();
738
739                    tracing::info!(
740                        "background streaming jobs: {:?} total {}",
741                        background_streaming_jobs,
742                        background_streaming_jobs.len()
743                    );
744
745                    let unreschedulable_jobs = {
746                        let mut unreschedulable_jobs = HashSet::new();
747
748                        for job_id in background_streaming_jobs {
749                            let scan_types = self
750                                .metadata_manager
751                                .get_job_backfill_scan_types(job_id)
752                                .await?;
753
754                            if scan_types
755                                .values()
756                                .any(|scan_type| !scan_type.is_reschedulable(false))
757                            {
758                                unreschedulable_jobs.insert(job_id);
759                            }
760                        }
761
762                        unreschedulable_jobs
763                    };
764
765                    if !unreschedulable_jobs.is_empty() {
766                        info!(
767                            "unreschedulable background jobs: {:?}",
768                            unreschedulable_jobs
769                        );
770                    }
771
772                    // Resolve actor info for recovery. If there's no actor to recover, most of the
773                    // following steps will be no-op, while the compute nodes will still be reset.
774                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
775                    if !unreschedulable_jobs.is_empty() {
776                        bail!(
777                            "Recovery for unreschedulable background jobs is not yet implemented. \
778                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
779                            unreschedulable_jobs
780                        );
781                    }
782
783                    let mut recovery_context = self.load_recovery_context(None).await?;
784                    if self.apply_pre_applied_drop_cancel(None).await? {
785                        recovery_context = self.load_recovery_context(None).await?;
786                    }
787
788                    self.purge_state_table_from_hummock(
789                        &recovery_context
790                            .fragment_context
791                            .job_fragments
792                            .values()
793                            .flat_map(|fragments| fragments.values())
794                            .flat_map(|fragment| fragment.state_table_ids.iter().copied())
795                            .collect(),
796                    )
797                    .await
798                    .context("purge state table from hummock")?;
799
800                    let (state_table_committed_epochs, state_table_log_epochs) = self
801                        .hummock_manager
802                        .on_current_version_and_table_change_log(|version, table_change_log| {
803                            Self::resolve_hummock_version_epochs(
804                                recovery_context
805                                    .fragment_context
806                                    .job_fragments
807                                    .iter()
808                                    .filter_map(|(job_id, job)| {
809                                        initial_background_jobs
810                                            .contains(job_id)
811                                            .then_some((*job_id, job))
812                                    }),
813                                version,
814                                table_change_log,
815                            )
816                        })
817                        .await?;
818
819                    let mv_depended_subscriptions = self
820                        .metadata_manager
821                        .get_mv_depended_subscriptions(None)
822                        .await?;
823
824                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
825                    let background_jobs = {
826                        let mut refreshed_background_jobs = self
827                            .list_background_job_progress(None)
828                            .await
829                            .context("recover background job progress should not fail")?;
830                        recovery_context
831                            .fragment_context
832                            .job_map
833                            .keys()
834                            .filter_map(|job_id| {
835                                refreshed_background_jobs.remove(job_id).then_some(*job_id)
836                            })
837                            .collect()
838                    };
839
840                    let database_infos = self
841                        .metadata_manager
842                        .catalog_controller
843                        .list_databases()
844                        .await?;
845
846                    let cdc_table_snapshot_splits =
847                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
848                            .await?;
849
850                    Ok(BarrierWorkerRuntimeInfoSnapshot {
851                        active_streaming_nodes,
852                        recovery_context,
853                        state_table_committed_epochs,
854                        state_table_log_epochs,
855                        mv_depended_subscriptions,
856                        background_jobs,
857                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
858                        database_infos,
859                        cdc_table_snapshot_splits,
860                    })
861                }
862            }
863        }
864    }
865
866    pub(super) async fn reload_database_runtime_info_impl(
867        &self,
868        database_id: DatabaseId,
869    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
870        self.clean_dirty_streaming_jobs(Some(database_id))
871            .await
872            .context("clean dirty streaming jobs")?;
873
874        self.reset_sink_coordinator(Some(database_id))
875            .await
876            .context("reset sink coordinator")?;
877        self.abort_dirty_pending_sink_state(Some(database_id))
878            .await
879            .context("abort dirty pending sink state")?;
880
881        // Background job progress needs to be recovered.
882        tracing::info!(
883            ?database_id,
884            "recovering background job progress of database"
885        );
886
887        let background_jobs = self
888            .list_background_job_progress(Some(database_id))
889            .await
890            .context("recover background job progress of database should not fail")?;
891        tracing::info!(?database_id, "recovered background job progress");
892
893        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
894        let _ = self
895            .apply_pre_applied_drop_cancel(Some(database_id))
896            .await?;
897
898        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
899
900        let missing_background_jobs = background_jobs
901            .iter()
902            .filter(|job_id| {
903                !recovery_context
904                    .fragment_context
905                    .job_map
906                    .contains_key(*job_id)
907            })
908            .copied()
909            .collect_vec();
910        if !missing_background_jobs.is_empty() {
911            warn!(
912                database_id = %database_id,
913                missing_job_ids = ?missing_background_jobs,
914                "background jobs missing in rendered info"
915            );
916        }
917
918        let (state_table_committed_epochs, state_table_log_epochs) = self
919            .hummock_manager
920            .on_current_version_and_table_change_log(|version, table_change_log| {
921                Self::resolve_hummock_version_epochs(
922                    background_jobs.iter().filter_map(|job_id| {
923                        recovery_context
924                            .fragment_context
925                            .job_fragments
926                            .get(job_id)
927                            .map(|job| (*job_id, job))
928                    }),
929                    version,
930                    table_change_log,
931                )
932            })
933            .await?;
934
935        let mv_depended_subscriptions = self
936            .metadata_manager
937            .get_mv_depended_subscriptions(Some(database_id))
938            .await?;
939
940        let cdc_table_snapshot_splits =
941            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
942                .await?;
943
944        self.refresh_manager
945            .remove_trackers_by_database(database_id);
946
947        Ok(DatabaseRuntimeInfoSnapshot {
948            recovery_context,
949            state_table_committed_epochs,
950            state_table_log_epochs,
951            mv_depended_subscriptions,
952            background_jobs,
953            cdc_table_snapshot_splits,
954        })
955    }
956}
957
958#[cfg(test)]
959mod tests {
960    use std::collections::HashMap;
961
962    use risingwave_common::catalog::FragmentTypeMask;
963    use risingwave_common::id::WorkerId;
964    use risingwave_meta_model::DispatcherType;
965    use risingwave_meta_model::fragment::DistributionType;
966    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
967    use risingwave_pb::stream_plan::{
968        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
969    };
970
971    use super::*;
972    use crate::controller::fragment::InflightActorInfo;
973    use crate::model::DownstreamFragmentRelation;
974    use crate::stream::UpstreamSinkInfo;
975
976    #[test]
977    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
978        let database_id = DatabaseId::new(1);
979        let job_id = JobId::new(10);
980        let fragment_id = FragmentId::new(100);
981        let sink_fragment_id = FragmentId::new(200);
982
983        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
984        let fragment = InflightFragmentInfo {
985            fragment_id,
986            distribution_type: DistributionType::Hash,
987            fragment_type_mask: FragmentTypeMask::empty(),
988            vnode_count: 1,
989            nodes: PbStreamNode {
990                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
991                    PbUpstreamSinkUnionNode {
992                        init_upstreams: vec![],
993                    },
994                ))),
995                ..Default::default()
996            },
997            actors: HashMap::new(),
998            state_table_ids: HashSet::new(),
999        };
1000
1001        inflight_jobs
1002            .entry(database_id)
1003            .or_default()
1004            .entry(job_id)
1005            .or_default()
1006            .insert(fragment_id, fragment);
1007
1008        let upstream_sink_recovery = HashMap::from([(
1009            job_id,
1010            UpstreamSinkRecoveryInfo {
1011                target_fragment_id: fragment_id,
1012                upstream_infos: vec![UpstreamSinkInfo {
1013                    sink_id: SinkId::new(1),
1014                    sink_fragment_id,
1015                    sink_output_fields: vec![],
1016                    sink_original_target_columns: vec![],
1017                    project_exprs: vec![],
1018                    new_sink_downstream: DownstreamFragmentRelation {
1019                        downstream_fragment_id: FragmentId::new(300),
1020                        dispatcher_type: DispatcherType::Hash,
1021                        dist_key_indices: vec![],
1022                        output_mapping: PbDispatchOutputMapping::default(),
1023                    },
1024                }],
1025            },
1026        )]);
1027
1028        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1029
1030        let updated = inflight_jobs
1031            .get(&database_id)
1032            .unwrap()
1033            .get(&job_id)
1034            .unwrap()
1035            .get(&fragment_id)
1036            .unwrap();
1037
1038        let PbNodeBody::UpstreamSinkUnion(updated_union) =
1039            updated.nodes.node_body.as_ref().unwrap()
1040        else {
1041            panic!("expected upstream sink union node");
1042        };
1043
1044        assert_eq!(updated_union.init_upstreams.len(), 1);
1045        assert_eq!(
1046            updated_union.init_upstreams[0].upstream_fragment_id,
1047            sink_fragment_id.as_raw_id()
1048        );
1049    }
1050
1051    #[test]
1052    fn test_build_stream_actors_uses_preloaded_extra_info() {
1053        let database_id = DatabaseId::new(2);
1054        let job_id = JobId::new(20);
1055        let fragment_id = FragmentId::new(120);
1056        let actor_id = ActorId::new(500);
1057
1058        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1059        inflight_jobs
1060            .entry(database_id)
1061            .or_default()
1062            .entry(job_id)
1063            .or_default()
1064            .insert(
1065                fragment_id,
1066                InflightFragmentInfo {
1067                    fragment_id,
1068                    distribution_type: DistributionType::Hash,
1069                    fragment_type_mask: FragmentTypeMask::empty(),
1070                    vnode_count: 1,
1071                    nodes: PbStreamNode::default(),
1072                    actors: HashMap::from([(
1073                        actor_id,
1074                        InflightActorInfo {
1075                            worker_id: WorkerId::new(1),
1076                            vnode_bitmap: None,
1077                            splits: vec![],
1078                        },
1079                    )]),
1080                    state_table_ids: HashSet::new(),
1081                },
1082            );
1083
1084        let job_extra_info = HashMap::from([(
1085            job_id,
1086            StreamingJobExtraInfo {
1087                timezone: Some("UTC".to_owned()),
1088                config_override: "cfg".into(),
1089                job_definition: "definition".to_owned(),
1090                backfill_orders: None,
1091                refresh_interval_sec: None,
1092            },
1093        )]);
1094
1095        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1096
1097        let actor = stream_actors.get(&actor_id).unwrap();
1098        assert_eq!(actor.actor_id, actor_id);
1099        assert_eq!(actor.fragment_id, fragment_id);
1100        assert_eq!(actor.mview_definition, "definition");
1101        assert_eq!(&*actor.config_override, "cfg");
1102        let expr_ctx = actor.expr_context.as_ref().unwrap();
1103        assert_eq!(expr_ctx.time_zone, "UTC");
1104    }
1105}