risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::AtomicU32;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::TableChangeLogs;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::stream_plan::stream_node::PbNodeBody;
31use sea_orm::TransactionTrait;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::checkpoint::{
39    BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, BatchRefreshRenderResult,
40};
41use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
42use crate::barrier::progress::TrackingJob;
43use crate::barrier::rpc::to_partial_graph_id;
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::controller::scale::{
46    FragmentRenderMap, LoadedFragment, LoadedFragmentContext, RenderedGraph,
47    render_actor_assignments,
48};
49use crate::controller::utils::StreamingJobExtraInfo;
50use crate::manager::ActiveStreamingWorkerNodes;
51use crate::model::{ActorId, FragmentDownstreamRelation, FragmentId, StreamActor};
52use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
53use crate::stream::cdc::reload_cdc_table_snapshot_splits;
54use crate::stream::{
55    SourceChange, StreamFragmentGraph, UpstreamSinkInfo, cleanup_dropped_streaming_jobs,
56};
57
58#[derive(Debug)]
59pub(crate) struct UpstreamSinkRecoveryInfo {
60    target_fragment_id: FragmentId,
61    upstream_infos: Vec<UpstreamSinkInfo>,
62}
63
64#[derive(Debug)]
65pub struct LoadedRecoveryContext {
66    pub fragment_context: LoadedFragmentContext,
67    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
68    pub upstream_sink_recovery: HashMap<JobId, UpstreamSinkRecoveryInfo>,
69    pub fragment_relations: FragmentDownstreamRelation,
70}
71
72impl LoadedRecoveryContext {
73    fn empty(fragment_context: LoadedFragmentContext) -> Self {
74        Self {
75            fragment_context,
76            job_extra_info: HashMap::new(),
77            upstream_sink_recovery: HashMap::new(),
78            fragment_relations: FragmentDownstreamRelation::default(),
79        }
80    }
81}
82
83pub struct RenderedDatabaseRuntimeInfo {
84    pub job_infos: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
85    pub stream_actors: HashMap<ActorId, StreamActor>,
86    pub source_splits: HashMap<ActorId, Vec<SplitImpl>>,
87    /// Batch refresh jobs rendered during `render_runtime_info`.
88    pub batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
89}
90
91pub fn render_runtime_info(
92    actor_id_generator: &AtomicU32,
93    worker_nodes: &ActiveStreamingWorkerNodes,
94    recovery_context: &LoadedRecoveryContext,
95    database_id: DatabaseId,
96) -> MetaResult<Option<RenderedDatabaseRuntimeInfo>> {
97    let Some(mut per_database_context) =
98        recovery_context.fragment_context.for_database(database_id)
99    else {
100        return Ok(None);
101    };
102
103    assert!(!per_database_context.is_empty());
104
105    // Extract batch refresh jobs before rendering via `render_actor_assignments`.
106    // They will be rendered independently using the unified `render_actors_and_build_job_info`.
107    let batch_refresh_job_ids: HashSet<JobId> = per_database_context
108        .job_map
109        .iter()
110        .filter(|(_, model)| model.refresh_interval_sec.is_some())
111        .map(|(job_id, _)| *job_id)
112        .collect();
113
114    let mut batch_refresh_logical = HashMap::new();
115    if !batch_refresh_job_ids.is_empty() {
116        let batch_refresh_fragment_ids: HashSet<FragmentId> = batch_refresh_job_ids
117            .iter()
118            .flat_map(|job_id| {
119                per_database_context
120                    .job_fragments
121                    .get(job_id)
122                    .unwrap()
123                    .keys()
124                    .copied()
125            })
126            .collect();
127
128        for &job_id in &batch_refresh_job_ids {
129            let fragments = per_database_context.job_fragments.remove(&job_id).unwrap();
130            let downstreams = fragments
131                .keys()
132                .filter_map(|fid| {
133                    recovery_context
134                        .fragment_relations
135                        .get(fid)
136                        .map(|r| (*fid, r.clone()))
137                })
138                .collect();
139            batch_refresh_logical.insert(
140                job_id,
141                BatchRefreshLogicalFragments {
142                    fragments,
143                    downstreams,
144                },
145            );
146        }
147
148        // Remove ensembles that only contain batch refresh fragments.
149        per_database_context.ensembles.retain(|ensemble| {
150            !ensemble
151                .component_fragments()
152                .all(|fid| batch_refresh_fragment_ids.contains(&fid))
153        });
154    }
155
156    // Render actors for each batch refresh job.
157    let mut batch_refresh = HashMap::new();
158    for (job_id, logical) in batch_refresh_logical {
159        let extra = recovery_context
160            .job_extra_info
161            .get(&job_id)
162            .expect("should have extra info");
163        let streaming_job_model = per_database_context
164            .job_map
165            .get(&job_id)
166            .expect("should have streaming job model");
167        let database_model = &per_database_context.database_map[&database_id];
168        let partial_graph_id = to_partial_graph_id(database_id, Some(job_id));
169
170        let render_result = BatchRefreshJobCheckpointControl::render_actors_and_build_job_info(
171            &logical.fragments,
172            &logical.downstreams,
173            &extra.job_definition,
174            actor_id_generator,
175            worker_nodes.current(),
176            &database_model.resource_group,
177            streaming_job_model,
178            partial_graph_id,
179        )?;
180
181        batch_refresh.insert(job_id, render_result);
182    }
183
184    // If all fragments were batch refresh, no normal rendering needed.
185    if per_database_context.ensembles.is_empty() {
186        return Ok(Some(RenderedDatabaseRuntimeInfo {
187            job_infos: HashMap::new(),
188            stream_actors: HashMap::new(),
189            source_splits: HashMap::new(),
190            batch_refresh,
191        }));
192    }
193
194    let RenderedGraph { mut fragments, .. } = render_actor_assignments(
195        actor_id_generator,
196        worker_nodes.current(),
197        &per_database_context,
198    )?;
199
200    let single_database = match fragments.remove(&database_id) {
201        Some(info) => info,
202        None => return Ok(None),
203    };
204
205    let mut database_map = HashMap::from([(database_id, single_database)]);
206    recovery_table_with_upstream_sinks(
207        &mut database_map,
208        &recovery_context.upstream_sink_recovery,
209    )?;
210    let stream_actors = build_stream_actors(&database_map, &recovery_context.job_extra_info)?;
211
212    let job_infos = database_map
213        .remove(&database_id)
214        .expect("database entry must exist");
215
216    let mut source_splits = HashMap::new();
217    for fragment_infos in job_infos.values() {
218        for fragment in fragment_infos.values() {
219            for (actor_id, info) in &fragment.actors {
220                source_splits.insert(*actor_id, info.splits.clone());
221            }
222        }
223    }
224
225    Ok(Some(RenderedDatabaseRuntimeInfo {
226        job_infos,
227        stream_actors,
228        source_splits,
229        batch_refresh,
230    }))
231}
232
233/// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
234/// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
235/// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
236/// the operator. All necessary metadata must be preloaded before rendering.
237fn recovery_table_with_upstream_sinks(
238    inflight_jobs: &mut FragmentRenderMap,
239    upstream_sink_recovery: &HashMap<JobId, UpstreamSinkRecoveryInfo>,
240) -> MetaResult<()> {
241    if upstream_sink_recovery.is_empty() {
242        return Ok(());
243    }
244
245    let mut seen_jobs = HashSet::new();
246
247    for jobs in inflight_jobs.values_mut() {
248        for (job_id, fragments) in jobs {
249            if !seen_jobs.insert(*job_id) {
250                return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
251            }
252
253            if let Some(recovery) = upstream_sink_recovery.get(job_id) {
254                if let Some(target_fragment) = fragments.get_mut(&recovery.target_fragment_id) {
255                    refill_upstream_sink_union_in_table(
256                        &mut target_fragment.nodes,
257                        &recovery.upstream_infos,
258                    );
259                } else {
260                    return Err(anyhow::anyhow!(
261                        "target fragment {} not found for upstream sink recovery of job {}",
262                        recovery.target_fragment_id,
263                        job_id
264                    )
265                    .into());
266                }
267            }
268        }
269    }
270
271    Ok(())
272}
273
274/// Assembles `StreamActor` instances from rendered fragment info and job context.
275///
276/// This function combines the actor assignments from `FragmentRenderMap` with
277/// runtime context (timezone, config, definition) from `StreamingJobExtraInfo`
278/// to produce the final `StreamActor` structures needed for recovery.
279fn build_stream_actors(
280    all_info: &FragmentRenderMap,
281    job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
282) -> MetaResult<HashMap<ActorId, StreamActor>> {
283    let mut stream_actors = HashMap::new();
284
285    for (job_id, streaming_info) in all_info.values().flatten() {
286        let extra_info = job_extra_info
287            .get(job_id)
288            .cloned()
289            .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
290        let expr_context = extra_info.stream_context().to_expr_context();
291        let job_definition = extra_info.job_definition;
292        let config_override = extra_info.config_override;
293
294        for (fragment_id, fragment_infos) in streaming_info {
295            for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
296                stream_actors.insert(
297                    *actor_id,
298                    StreamActor {
299                        actor_id: *actor_id,
300                        fragment_id: *fragment_id,
301                        vnode_bitmap: vnode_bitmap.clone(),
302                        mview_definition: job_definition.clone(),
303                        expr_context: Some(expr_context.clone()),
304                        config_override: config_override.clone(),
305                    },
306                );
307            }
308        }
309    }
310    Ok(stream_actors)
311}
312
313impl GlobalBarrierWorkerContextImpl {
314    fn resolve_job_committed_epoch(
315        job_id: JobId,
316        fragments: &HashMap<FragmentId, LoadedFragment>,
317        state_table_committed_epochs: &HashMap<TableId, u64>,
318    ) -> MetaResult<u64> {
319        let mut table_id_iter = fragments
320            .values()
321            .flat_map(|fragment| fragment.state_table_ids.iter().copied());
322        let Some(first_table_id) = table_id_iter.next() else {
323            bail!("job {} has no state table", job_id);
324        };
325        let committed_epoch = *state_table_committed_epochs
326            .get(&first_table_id)
327            .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", first_table_id))?;
328        for table_id in table_id_iter {
329            let table_committed_epoch = *state_table_committed_epochs
330                .get(&table_id)
331                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?;
332            if committed_epoch != table_committed_epoch {
333                bail!(
334                    "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
335                    first_table_id,
336                    committed_epoch,
337                    table_id,
338                    table_committed_epoch,
339                    job_id
340                );
341            }
342        }
343
344        Ok(committed_epoch)
345    }
346
347    async fn finish_completed_batch_refresh_background_jobs(
348        &self,
349        recovery_context: &LoadedRecoveryContext,
350        state_table_committed_epochs: &HashMap<TableId, u64>,
351        background_jobs: &mut HashSet<JobId>,
352    ) -> MetaResult<()> {
353        let background_job_ids = background_jobs.iter().copied().collect_vec();
354        for job_id in background_job_ids {
355            let Some(job) = recovery_context.fragment_context.job_map.get(&job_id) else {
356                continue;
357            };
358            if job.refresh_interval_sec.is_none() {
359                continue;
360            }
361            let Some(fragments) = recovery_context.fragment_context.job_fragments.get(&job_id)
362            else {
363                continue;
364            };
365
366            let committed_epoch =
367                Self::resolve_job_committed_epoch(job_id, fragments, state_table_committed_epochs)?;
368            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
369                fragments
370                    .values()
371                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
372            )?
373            .0
374            .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
375            let snapshot_epoch = snapshot_backfill_info
376                .upstream_mv_table_id_to_backfill_epoch
377                .values()
378                .find_map(|e| *e)
379                .unwrap_or(committed_epoch);
380            if committed_epoch < snapshot_epoch {
381                continue;
382            }
383
384            info!(
385                %job_id,
386                committed_epoch,
387                snapshot_epoch,
388                "finish completed batch refresh background job during recovery"
389            );
390            self.finish_creating_job(TrackingJob::recovered_from_fragment_nodes(
391                job_id,
392                fragments
393                    .iter()
394                    .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
395            ))
396            .await?;
397            background_jobs.remove(&job_id);
398        }
399
400        Ok(())
401    }
402
403    async fn apply_pre_applied_drop_cancel(
404        &self,
405        database_id: Option<DatabaseId>,
406    ) -> MetaResult<bool> {
407        let drop_cancel = self.scheduled_barriers.pre_apply_drop_cancel(database_id);
408        let has_drop_streaming_jobs = !drop_cancel.streaming_job_ids.is_empty();
409        cleanup_dropped_streaming_jobs(
410            &self.refresh_manager,
411            &self.hummock_manager,
412            &self.metadata_manager,
413            drop_cancel.streaming_job_ids,
414            drop_cancel.dropped_state_table_ids,
415            "drop_streaming_jobs",
416        )
417        .await?;
418        Ok(has_drop_streaming_jobs)
419    }
420
421    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
422    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
423        self.metadata_manager
424            .catalog_controller
425            .clean_dirty_subscription(database_id)
426            .await?;
427
428        let cleaned_dirty_jobs = self
429            .metadata_manager
430            .catalog_controller
431            .clean_dirty_creating_jobs(database_id)
432            .await?;
433        if database_id.is_some() {
434            // Per-database recovery does not run the global Hummock purge below. Unregister the
435            // dirty jobs cleaned in this database through the normal dropped-table cleanup path.
436            cleanup_dropped_streaming_jobs(
437                &self.refresh_manager,
438                &self.hummock_manager,
439                &self.metadata_manager,
440                cleaned_dirty_jobs.streaming_job_ids,
441                cleaned_dirty_jobs.dropped_table_ids,
442                "clean_dirty_creating_jobs",
443            )
444            .await?;
445        }
446        self.metadata_manager
447            .reset_all_refresh_jobs_to_idle()
448            .await?;
449
450        // unregister cleaned sources.
451        self.source_manager
452            .apply_source_change(SourceChange::DropSource {
453                dropped_source_ids: cleaned_dirty_jobs.source_ids,
454            })
455            .await;
456
457        Ok(())
458    }
459
460    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
461        if let Some(database_id) = database_id {
462            let sink_ids = self
463                .metadata_manager
464                .catalog_controller
465                .list_sink_ids(Some(database_id))
466                .await?;
467            self.sink_manager.stop_sink_coordinator(sink_ids).await;
468        } else {
469            self.sink_manager.reset().await;
470        }
471        Ok(())
472    }
473
474    async fn abort_dirty_pending_sink_state(
475        &self,
476        database_id: Option<DatabaseId>,
477    ) -> MetaResult<()> {
478        let pending_sinks: HashSet<SinkId> = self
479            .metadata_manager
480            .catalog_controller
481            .list_all_pending_sinks(database_id)
482            .await?;
483
484        if pending_sinks.is_empty() {
485            return Ok(());
486        }
487
488        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
489            .metadata_manager
490            .catalog_controller
491            .fetch_sink_with_state_table_ids(pending_sinks)
492            .await?;
493
494        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
495
496        for (sink_id, table_ids) in sink_with_state_tables {
497            let Some(table_id) = table_ids.first() else {
498                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
499            };
500
501            self.hummock_manager
502                .on_current_version(|version| -> MetaResult<()> {
503                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
504                        assert!(
505                            sink_committed_epoch
506                                .insert(sink_id, committed_epoch)
507                                .is_none()
508                        );
509                        Ok(())
510                    } else {
511                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
512                    }
513                })
514                .await?;
515        }
516
517        self.metadata_manager
518            .catalog_controller
519            .abort_pending_sink_epochs(sink_committed_epoch)
520            .await?;
521
522        Ok(())
523    }
524
525    async fn purge_state_table_from_hummock(
526        &self,
527        all_state_table_ids: &HashSet<TableId>,
528    ) -> MetaResult<()> {
529        self.hummock_manager.purge(all_state_table_ids).await?;
530        Ok(())
531    }
532
533    async fn list_background_job_progress(
534        &self,
535        database_id: Option<DatabaseId>,
536    ) -> MetaResult<HashSet<JobId>> {
537        let mgr = &self.metadata_manager;
538        mgr.catalog_controller
539            .list_background_creating_jobs(false, database_id)
540            .await
541    }
542
543    async fn load_recovery_context(
544        &self,
545        database_id: Option<DatabaseId>,
546    ) -> MetaResult<LoadedRecoveryContext> {
547        let inner = self
548            .metadata_manager
549            .catalog_controller
550            .get_inner_read_guard()
551            .await;
552        let txn = inner.db.begin().await?;
553
554        let fragment_context = self
555            .metadata_manager
556            .catalog_controller
557            .load_fragment_context_in_txn(&txn, database_id)
558            .await
559            .inspect_err(|err| {
560                warn!(error = %err.as_report(), "load fragment context failed");
561            })?;
562
563        if fragment_context.is_empty() {
564            return Ok(LoadedRecoveryContext::empty(fragment_context));
565        }
566
567        let job_ids = fragment_context.job_map.keys().copied().collect_vec();
568        let job_extra_info = self
569            .metadata_manager
570            .catalog_controller
571            .get_streaming_job_extra_info_in_txn(&txn, job_ids)
572            .await?;
573
574        let mut upstream_targets = HashMap::new();
575        for fragment in fragment_context
576            .job_fragments
577            .values()
578            .flat_map(|fragments| fragments.values())
579        {
580            let mut has_upstream_union = false;
581            visit_stream_node_cont(&fragment.nodes, |node| {
582                if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
583                    has_upstream_union = true;
584                    false
585                } else {
586                    true
587                }
588            });
589
590            if has_upstream_union
591                && let Some(previous) =
592                    upstream_targets.insert(fragment.job_id, fragment.fragment_id)
593            {
594                bail!(
595                    "multiple upstream sink union fragments found for job {}, fragment {}, kept {}",
596                    fragment.job_id,
597                    fragment.fragment_id,
598                    previous
599                );
600            }
601        }
602
603        let mut upstream_sink_recovery = HashMap::new();
604        if !upstream_targets.is_empty() {
605            let tables = self
606                .metadata_manager
607                .catalog_controller
608                .get_user_created_table_by_ids_in_txn(&txn, upstream_targets.keys().copied())
609                .await?;
610
611            for table in tables {
612                let job_id = table.id.as_job_id();
613                let Some(target_fragment_id) = upstream_targets.get(&job_id) else {
614                    // This should not happen unless catalog changes or legacy metadata are involved.
615                    tracing::debug!(
616                        job_id = %job_id,
617                        "upstream sink union target fragment not found for table"
618                    );
619                    continue;
620                };
621
622                let upstream_infos = self
623                    .metadata_manager
624                    .catalog_controller
625                    .get_all_upstream_sink_infos_in_txn(&txn, &table, *target_fragment_id as _)
626                    .await?;
627
628                upstream_sink_recovery.insert(
629                    job_id,
630                    UpstreamSinkRecoveryInfo {
631                        target_fragment_id: *target_fragment_id,
632                        upstream_infos,
633                    },
634                );
635            }
636        }
637
638        let fragment_relations = self
639            .metadata_manager
640            .catalog_controller
641            .get_fragment_downstream_relations_in_txn(
642                &txn,
643                fragment_context
644                    .job_fragments
645                    .values()
646                    .flat_map(|fragments| fragments.keys().copied())
647                    .collect_vec(),
648            )
649            .await?;
650
651        Ok(LoadedRecoveryContext {
652            fragment_context,
653            job_extra_info,
654            upstream_sink_recovery,
655            fragment_relations,
656        })
657    }
658
659    #[expect(clippy::type_complexity)]
660    fn resolve_hummock_version_epochs(
661        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, LoadedFragment>)>,
662        version: &HummockVersion,
663        table_change_log: &TableChangeLogs,
664    ) -> MetaResult<(
665        HashMap<TableId, u64>,
666        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
667    )> {
668        let table_committed_epoch: HashMap<_, _> = version
669            .state_table_info
670            .info()
671            .iter()
672            .map(|(table_id, info)| (*table_id, info.committed_epoch))
673            .collect();
674        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
675            Ok(*table_committed_epoch
676                .get(&table_id)
677                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
678        };
679        let mut min_downstream_committed_epochs = HashMap::new();
680        for (job_id, fragments) in background_jobs {
681            let job_committed_epoch =
682                Self::resolve_job_committed_epoch(job_id, fragments, &table_committed_epoch)?;
683            if let (Some(snapshot_backfill_info), _) =
684                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
685                    fragments
686                        .values()
687                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
688                )?
689            {
690                for (upstream_table, snapshot_epoch) in
691                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
692                {
693                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
694                        anyhow!(
695                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
696                            job_id, upstream_table
697                        )
698                    })?;
699                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
700                    match min_downstream_committed_epochs.entry(upstream_table) {
701                        Entry::Occupied(entry) => {
702                            let prev_min_epoch = entry.into_mut();
703                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
704                        }
705                        Entry::Vacant(entry) => {
706                            entry.insert(pinned_epoch);
707                        }
708                    }
709                }
710            }
711        }
712        let mut log_epochs = HashMap::new();
713        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
714            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
715            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
716                Ordering::Less => {
717                    bail!(
718                        "downstream epoch {} later than upstream epoch {} of table {}",
719                        downstream_committed_epoch,
720                        upstream_committed_epoch,
721                        upstream_table_id
722                    );
723                }
724                Ordering::Equal => {
725                    continue;
726                }
727                Ordering::Greater => {
728                    if let Some(table_change_log) = table_change_log.get(&upstream_table_id) {
729                        let epochs = table_change_log
730                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
731                            .map(|epoch_log| {
732                                (
733                                    epoch_log.non_checkpoint_epochs.clone(),
734                                    epoch_log.checkpoint_epoch,
735                                )
736                            })
737                            .collect_vec();
738                        let first_epochs = epochs.first();
739                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
740                            && *first_checkpoint_epoch == downstream_committed_epoch
741                        {
742                        } else {
743                            bail!(
744                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
745                                epochs,
746                                upstream_table_id,
747                                downstream_committed_epoch
748                            );
749                        }
750                        log_epochs
751                            .try_insert(upstream_table_id, epochs)
752                            .expect("non-duplicated");
753                    } else {
754                        bail!(
755                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
756                            upstream_table_id,
757                            upstream_committed_epoch,
758                            downstream_committed_epoch
759                        );
760                    }
761                }
762            }
763        }
764        Ok((table_committed_epoch, log_epochs))
765    }
766
767    pub(super) async fn reload_runtime_info_impl(
768        &self,
769    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
770        {
771            {
772                {
773                    self.clean_dirty_streaming_jobs(None)
774                        .await
775                        .context("clean dirty streaming jobs")?;
776
777                    self.reset_sink_coordinator(None)
778                        .await
779                        .context("reset sink coordinator")?;
780                    self.abort_dirty_pending_sink_state(None)
781                        .await
782                        .context("abort dirty pending sink state")?;
783
784                    // Background job progress needs to be recovered.
785                    tracing::info!("recovering background job progress");
786                    let mut initial_background_jobs = self
787                        .list_background_job_progress(None)
788                        .await
789                        .context("recover background job progress should not fail")?;
790
791                    tracing::info!("recovered background job progress");
792
793                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
794                    let _ = self.apply_pre_applied_drop_cancel(None).await?;
795                    self.metadata_manager
796                        .catalog_controller
797                        .cleanup_dropped_tables()
798                        .await;
799
800                    let active_streaming_nodes =
801                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
802                            .await?;
803
804                    let background_streaming_jobs =
805                        initial_background_jobs.iter().cloned().collect_vec();
806
807                    tracing::info!(
808                        "background streaming jobs: {:?} total {}",
809                        background_streaming_jobs,
810                        background_streaming_jobs.len()
811                    );
812
813                    let unreschedulable_jobs = {
814                        let mut unreschedulable_jobs = HashSet::new();
815
816                        for job_id in background_streaming_jobs {
817                            let scan_types = self
818                                .metadata_manager
819                                .get_job_backfill_scan_types(job_id)
820                                .await?;
821
822                            if scan_types
823                                .values()
824                                .any(|scan_type| !scan_type.is_reschedulable(false))
825                            {
826                                unreschedulable_jobs.insert(job_id);
827                            }
828                        }
829
830                        unreschedulable_jobs
831                    };
832
833                    if !unreschedulable_jobs.is_empty() {
834                        info!(
835                            "unreschedulable background jobs: {:?}",
836                            unreschedulable_jobs
837                        );
838                    }
839
840                    // Resolve actor info for recovery. If there's no actor to recover, most of the
841                    // following steps will be no-op, while the compute nodes will still be reset.
842                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
843                    if !unreschedulable_jobs.is_empty() {
844                        bail!(
845                            "Recovery for unreschedulable background jobs is not yet implemented. \
846                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
847                            unreschedulable_jobs
848                        );
849                    }
850
851                    let mut recovery_context = self.load_recovery_context(None).await?;
852                    if self.apply_pre_applied_drop_cancel(None).await? {
853                        recovery_context = self.load_recovery_context(None).await?;
854                    }
855
856                    self.purge_state_table_from_hummock(
857                        &recovery_context
858                            .fragment_context
859                            .job_fragments
860                            .values()
861                            .flat_map(|fragments| fragments.values())
862                            .flat_map(|fragment| fragment.state_table_ids.iter().copied())
863                            .collect(),
864                    )
865                    .await
866                    .context("purge state table from hummock")?;
867
868                    let (state_table_committed_epochs, state_table_log_epochs) = self
869                        .hummock_manager
870                        .on_current_version_and_table_change_log(|version, table_change_log| {
871                            Self::resolve_hummock_version_epochs(
872                                recovery_context
873                                    .fragment_context
874                                    .job_fragments
875                                    .iter()
876                                    .filter_map(|(job_id, job)| {
877                                        initial_background_jobs
878                                            .contains(job_id)
879                                            .then_some((*job_id, job))
880                                    }),
881                                version,
882                                table_change_log,
883                            )
884                        })
885                        .await?;
886
887                    self.finish_completed_batch_refresh_background_jobs(
888                        &recovery_context,
889                        &state_table_committed_epochs,
890                        &mut initial_background_jobs,
891                    )
892                    .await?;
893
894                    let mv_depended_subscriptions = self
895                        .metadata_manager
896                        .get_mv_depended_subscriptions(None)
897                        .await?;
898
899                    // Refresh background job progress for the final snapshot to reflect any catalog changes.
900                    let background_jobs = {
901                        let mut refreshed_background_jobs = self
902                            .list_background_job_progress(None)
903                            .await
904                            .context("recover background job progress should not fail")?;
905                        recovery_context
906                            .fragment_context
907                            .job_map
908                            .keys()
909                            .filter_map(|job_id| {
910                                refreshed_background_jobs.remove(job_id).then_some(*job_id)
911                            })
912                            .collect()
913                    };
914
915                    let database_infos = self
916                        .metadata_manager
917                        .catalog_controller
918                        .list_databases()
919                        .await?;
920
921                    let cdc_table_snapshot_splits =
922                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
923                            .await?;
924
925                    Ok(BarrierWorkerRuntimeInfoSnapshot {
926                        active_streaming_nodes,
927                        recovery_context,
928                        state_table_committed_epochs,
929                        state_table_log_epochs,
930                        mv_depended_subscriptions,
931                        background_jobs,
932                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
933                        database_infos,
934                        cdc_table_snapshot_splits,
935                    })
936                }
937            }
938        }
939    }
940
941    pub(super) async fn reload_database_runtime_info_impl(
942        &self,
943        database_id: DatabaseId,
944    ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
945        self.clean_dirty_streaming_jobs(Some(database_id))
946            .await
947            .context("clean dirty streaming jobs")?;
948
949        self.reset_sink_coordinator(Some(database_id))
950            .await
951            .context("reset sink coordinator")?;
952        self.abort_dirty_pending_sink_state(Some(database_id))
953            .await
954            .context("abort dirty pending sink state")?;
955
956        // Background job progress needs to be recovered.
957        tracing::info!(
958            ?database_id,
959            "recovering background job progress of database"
960        );
961
962        let mut background_jobs = self
963            .list_background_job_progress(Some(database_id))
964            .await
965            .context("recover background job progress of database should not fail")?;
966        tracing::info!(?database_id, "recovered background job progress");
967
968        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
969        let _ = self
970            .apply_pre_applied_drop_cancel(Some(database_id))
971            .await?;
972
973        let recovery_context = self.load_recovery_context(Some(database_id)).await?;
974
975        let missing_background_jobs = background_jobs
976            .iter()
977            .filter(|job_id| {
978                !recovery_context
979                    .fragment_context
980                    .job_map
981                    .contains_key(*job_id)
982            })
983            .copied()
984            .collect_vec();
985        if !missing_background_jobs.is_empty() {
986            warn!(
987                database_id = %database_id,
988                missing_job_ids = ?missing_background_jobs,
989                "background jobs missing in rendered info"
990            );
991        }
992
993        let (state_table_committed_epochs, state_table_log_epochs) = self
994            .hummock_manager
995            .on_current_version_and_table_change_log(|version, table_change_log| {
996                Self::resolve_hummock_version_epochs(
997                    background_jobs.iter().filter_map(|job_id| {
998                        recovery_context
999                            .fragment_context
1000                            .job_fragments
1001                            .get(job_id)
1002                            .map(|job| (*job_id, job))
1003                    }),
1004                    version,
1005                    table_change_log,
1006                )
1007            })
1008            .await?;
1009
1010        self.finish_completed_batch_refresh_background_jobs(
1011            &recovery_context,
1012            &state_table_committed_epochs,
1013            &mut background_jobs,
1014        )
1015        .await?;
1016
1017        let mv_depended_subscriptions = self
1018            .metadata_manager
1019            .get_mv_depended_subscriptions(Some(database_id))
1020            .await?;
1021
1022        let cdc_table_snapshot_splits =
1023            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
1024                .await?;
1025
1026        self.refresh_manager
1027            .remove_trackers_by_database(database_id);
1028
1029        Ok(DatabaseRuntimeInfoSnapshot {
1030            recovery_context,
1031            state_table_committed_epochs,
1032            state_table_log_epochs,
1033            mv_depended_subscriptions,
1034            background_jobs,
1035            cdc_table_snapshot_splits,
1036        })
1037    }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use std::collections::HashMap;
1043
1044    use risingwave_common::catalog::FragmentTypeMask;
1045    use risingwave_common::id::WorkerId;
1046    use risingwave_meta_model::DispatcherType;
1047    use risingwave_meta_model::fragment::DistributionType;
1048    use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1049    use risingwave_pb::stream_plan::{
1050        PbDispatchOutputMapping, PbStreamNode, UpstreamSinkUnionNode as PbUpstreamSinkUnionNode,
1051    };
1052
1053    use super::*;
1054    use crate::controller::fragment::InflightActorInfo;
1055    use crate::model::DownstreamFragmentRelation;
1056    use crate::stream::UpstreamSinkInfo;
1057
1058    #[test]
1059    fn test_recovery_table_with_upstream_sinks_updates_union_node() {
1060        let database_id = DatabaseId::new(1);
1061        let job_id = JobId::new(10);
1062        let fragment_id = FragmentId::new(100);
1063        let sink_fragment_id = FragmentId::new(200);
1064
1065        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1066        let fragment = InflightFragmentInfo {
1067            fragment_id,
1068            distribution_type: DistributionType::Hash,
1069            fragment_type_mask: FragmentTypeMask::empty(),
1070            vnode_count: 1,
1071            nodes: PbStreamNode {
1072                node_body: Some(PbNodeBody::UpstreamSinkUnion(Box::new(
1073                    PbUpstreamSinkUnionNode {
1074                        init_upstreams: vec![],
1075                    },
1076                ))),
1077                ..Default::default()
1078            },
1079            actors: HashMap::new(),
1080            state_table_ids: HashSet::new(),
1081        };
1082
1083        inflight_jobs
1084            .entry(database_id)
1085            .or_default()
1086            .entry(job_id)
1087            .or_default()
1088            .insert(fragment_id, fragment);
1089
1090        let upstream_sink_recovery = HashMap::from([(
1091            job_id,
1092            UpstreamSinkRecoveryInfo {
1093                target_fragment_id: fragment_id,
1094                upstream_infos: vec![UpstreamSinkInfo {
1095                    sink_id: SinkId::new(1),
1096                    sink_fragment_id,
1097                    sink_output_fields: vec![],
1098                    sink_original_target_columns: vec![],
1099                    project_exprs: vec![],
1100                    new_sink_downstream: DownstreamFragmentRelation {
1101                        downstream_fragment_id: FragmentId::new(300),
1102                        dispatcher_type: DispatcherType::Hash,
1103                        dist_key_indices: vec![],
1104                        output_mapping: PbDispatchOutputMapping::default(),
1105                    },
1106                }],
1107            },
1108        )]);
1109
1110        recovery_table_with_upstream_sinks(&mut inflight_jobs, &upstream_sink_recovery).unwrap();
1111
1112        let updated = inflight_jobs
1113            .get(&database_id)
1114            .unwrap()
1115            .get(&job_id)
1116            .unwrap()
1117            .get(&fragment_id)
1118            .unwrap();
1119
1120        let PbNodeBody::UpstreamSinkUnion(updated_union) =
1121            updated.nodes.node_body.as_ref().unwrap()
1122        else {
1123            panic!("expected upstream sink union node");
1124        };
1125
1126        assert_eq!(updated_union.init_upstreams.len(), 1);
1127        assert_eq!(
1128            updated_union.init_upstreams[0].upstream_fragment_id,
1129            sink_fragment_id.as_raw_id()
1130        );
1131    }
1132
1133    #[test]
1134    fn test_build_stream_actors_uses_preloaded_extra_info() {
1135        let database_id = DatabaseId::new(2);
1136        let job_id = JobId::new(20);
1137        let fragment_id = FragmentId::new(120);
1138        let actor_id = ActorId::new(500);
1139
1140        let mut inflight_jobs: FragmentRenderMap = HashMap::new();
1141        inflight_jobs
1142            .entry(database_id)
1143            .or_default()
1144            .entry(job_id)
1145            .or_default()
1146            .insert(
1147                fragment_id,
1148                InflightFragmentInfo {
1149                    fragment_id,
1150                    distribution_type: DistributionType::Hash,
1151                    fragment_type_mask: FragmentTypeMask::empty(),
1152                    vnode_count: 1,
1153                    nodes: PbStreamNode::default(),
1154                    actors: HashMap::from([(
1155                        actor_id,
1156                        InflightActorInfo {
1157                            worker_id: WorkerId::new(1),
1158                            vnode_bitmap: None,
1159                            splits: vec![],
1160                        },
1161                    )]),
1162                    state_table_ids: HashSet::new(),
1163                },
1164            );
1165
1166        let job_extra_info = HashMap::from([(
1167            job_id,
1168            StreamingJobExtraInfo {
1169                timezone: Some("UTC".to_owned()),
1170                config_override: "cfg".into(),
1171                job_definition: "definition".to_owned(),
1172                backfill_orders: None,
1173                refresh_interval_sec: None,
1174            },
1175        )]);
1176
1177        let stream_actors = build_stream_actors(&inflight_jobs, &job_extra_info).unwrap();
1178
1179        let actor = stream_actors.get(&actor_id).unwrap();
1180        assert_eq!(actor.actor_id, actor_id);
1181        assert_eq!(actor.fragment_id, fragment_id);
1182        assert_eq!(actor.mview_definition, "definition");
1183        assert_eq!(&*actor.config_override, "cfg");
1184        let expr_ctx = actor.expr_context.as_ref().unwrap();
1185        assert_eq!(expr_ctx.time_zone, "UTC");
1186    }
1187}