risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::StreamingParallelism;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tokio::time::Instant;
34use tracing::{debug, info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::barrier::context::GlobalBarrierWorkerContextImpl;
38use crate::barrier::info::InflightStreamingJobInfo;
39use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
40use crate::manager::ActiveStreamingWorkerNodes;
41use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
42use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
43use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
44use crate::stream::{
45    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
46    RescheduleOptions, SourceChange, StreamFragmentGraph,
47};
48use crate::{MetaResult, model};
49
50impl GlobalBarrierWorkerContextImpl {
51    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
52    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
53        let database_id = database_id.map(|database_id| database_id.database_id as _);
54        self.metadata_manager
55            .catalog_controller
56            .clean_dirty_subscription(database_id)
57            .await?;
58        let dirty_associated_source_ids = self
59            .metadata_manager
60            .catalog_controller
61            .clean_dirty_creating_jobs(database_id)
62            .await?;
63        self.metadata_manager
64            .catalog_controller
65            .reset_refreshing_tables(database_id)
66            .await?;
67
68        // unregister cleaned sources.
69        self.source_manager
70            .apply_source_change(SourceChange::DropSource {
71                dropped_source_ids: dirty_associated_source_ids,
72            })
73            .await;
74
75        Ok(())
76    }
77
78    async fn purge_state_table_from_hummock(
79        &self,
80        all_state_table_ids: &HashSet<TableId>,
81    ) -> MetaResult<()> {
82        self.hummock_manager.purge(all_state_table_ids).await?;
83        Ok(())
84    }
85
86    async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
87        let mgr = &self.metadata_manager;
88        let job_info = mgr
89            .catalog_controller
90            .list_background_creating_jobs(false)
91            .await?;
92
93        try_join_all(
94            job_info
95                .into_iter()
96                .map(|(id, definition, _init_at)| async move {
97                    let table_id = TableId::new(id as _);
98                    let stream_job_fragments =
99                        mgr.catalog_controller.get_job_fragments_by_id(id).await?;
100                    assert_eq!(stream_job_fragments.stream_job_id(), table_id);
101                    Ok((definition, stream_job_fragments))
102                }),
103        )
104        .await
105        // If failed, enter recovery mode.
106    }
107
108    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
109    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
110    /// will create or drop before this barrier flow through them.
111    async fn resolve_graph_info(
112        &self,
113        database_id: Option<DatabaseId>,
114    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
115        let database_id = database_id.map(|database_id| database_id.database_id as _);
116        let all_actor_infos = self
117            .metadata_manager
118            .catalog_controller
119            .load_all_actors(database_id)
120            .await?;
121
122        Ok(all_actor_infos
123            .into_iter()
124            .map(|(loaded_database_id, job_fragment_infos)| {
125                if let Some(database_id) = database_id {
126                    assert_eq!(database_id, loaded_database_id);
127                }
128                (
129                    DatabaseId::new(loaded_database_id as _),
130                    job_fragment_infos
131                        .into_iter()
132                        .map(|(job_id, fragment_infos)| {
133                            let job_id = TableId::new(job_id as _);
134                            (
135                                job_id,
136                                InflightStreamingJobInfo {
137                                    job_id,
138                                    fragment_infos: fragment_infos
139                                        .into_iter()
140                                        .map(|(fragment_id, info)| (fragment_id as _, info))
141                                        .collect(),
142                                },
143                            )
144                        })
145                        .collect(),
146                )
147            })
148            .collect())
149    }
150
151    #[expect(clippy::type_complexity)]
152    fn resolve_hummock_version_epochs(
153        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
154        version: &HummockVersion,
155    ) -> MetaResult<(
156        HashMap<TableId, u64>,
157        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
158    )> {
159        let table_committed_epoch: HashMap<_, _> = version
160            .state_table_info
161            .info()
162            .iter()
163            .map(|(table_id, info)| (*table_id, info.committed_epoch))
164            .collect();
165        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
166            Ok(*table_committed_epoch
167                .get(&table_id)
168                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
169        };
170        let mut min_downstream_committed_epochs = HashMap::new();
171        for (_, job) in background_jobs.values() {
172            let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
173                // Question: should we get the committed epoch from any state tables in the job?
174                warn!(
175                    "background job {} has no committed epoch, skip resolving epochs",
176                    job.stream_job_id
177                );
178                continue;
179            };
180            if let (Some(snapshot_backfill_info), _) =
181                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
182                    job.fragments()
183                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
184                )?
185            {
186                for (upstream_table, snapshot_epoch) in
187                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188                {
189                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
190                        anyhow!(
191                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
192                            job
193                        )
194                    })?;
195                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
196                    match min_downstream_committed_epochs.entry(upstream_table) {
197                        Entry::Occupied(entry) => {
198                            let prev_min_epoch = entry.into_mut();
199                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
200                        }
201                        Entry::Vacant(entry) => {
202                            entry.insert(pinned_epoch);
203                        }
204                    }
205                }
206            }
207        }
208        let mut log_epochs = HashMap::new();
209        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
210            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
211            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
212                Ordering::Less => {
213                    return Err(anyhow!(
214                        "downstream epoch {} later than upstream epoch {} of table {}",
215                        downstream_committed_epoch,
216                        upstream_committed_epoch,
217                        upstream_table_id
218                    )
219                    .into());
220                }
221                Ordering::Equal => {
222                    continue;
223                }
224                Ordering::Greater => {
225                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
226                    {
227                        let epochs = table_change_log
228                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
229                            .map(|epoch_log| {
230                                (
231                                    epoch_log.non_checkpoint_epochs.clone(),
232                                    epoch_log.checkpoint_epoch,
233                                )
234                            })
235                            .collect_vec();
236                        let first_epochs = epochs.first();
237                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
238                            && *first_checkpoint_epoch == downstream_committed_epoch
239                        {
240                        } else {
241                            return Err(anyhow!(
242                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
243                                epochs, upstream_table_id, downstream_committed_epoch).into()
244                            );
245                        }
246                        log_epochs
247                            .try_insert(upstream_table_id, epochs)
248                            .expect("non-duplicated");
249                    } else {
250                        return Err(anyhow!(
251                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
252                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
253                        );
254                    }
255                }
256            }
257        }
258        Ok((table_committed_epoch, log_epochs))
259    }
260
261    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
262    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
263    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
264    /// the operator.
265    async fn recovery_table_with_upstream_sinks(
266        &self,
267        inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
268    ) -> MetaResult<()> {
269        let mut jobs = inflight_jobs.values_mut().try_fold(
270            HashMap::new(),
271            |mut acc, table_map| -> MetaResult<_> {
272                for (tid, job) in table_map {
273                    if acc.insert(tid.table_id, job).is_some() {
274                        return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
275                    }
276                }
277                Ok(acc)
278            },
279        )?;
280        let job_ids = jobs.keys().cloned().collect_vec();
281        // Only `Table` will be returned here, ignoring other catalog objects.
282        let tables = self
283            .metadata_manager
284            .catalog_controller
285            .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
286            .await?;
287        for table in tables {
288            assert_eq!(table.table_type(), PbTableType::Table);
289            let fragments = jobs.get_mut(&table.id).unwrap();
290            let mut target_fragment_id = None;
291            for fragment in fragments.fragment_infos.values() {
292                let mut is_target_fragment = false;
293                visit_stream_node_cont(&fragment.nodes, |node| {
294                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
295                        is_target_fragment = true;
296                        false
297                    } else {
298                        true
299                    }
300                });
301                if is_target_fragment {
302                    target_fragment_id = Some(fragment.fragment_id);
303                    break;
304                }
305            }
306            let Some(target_fragment_id) = target_fragment_id else {
307                tracing::debug!(
308                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
309                    table.id
310                );
311                continue;
312            };
313            let target_fragment = fragments
314                .fragment_infos
315                .get_mut(&target_fragment_id)
316                .unwrap();
317            let upstream_infos = self
318                .metadata_manager
319                .catalog_controller
320                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
321                .await?;
322            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
323        }
324
325        Ok(())
326    }
327
328    pub(super) async fn reload_runtime_info_impl(
329        &self,
330    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
331        {
332            {
333                {
334                    self.clean_dirty_streaming_jobs(None)
335                        .await
336                        .context("clean dirty streaming jobs")?;
337
338                    // Background job progress needs to be recovered.
339                    tracing::info!("recovering background job progress");
340                    let background_jobs = {
341                        let jobs = self
342                            .list_background_job_progress()
343                            .await
344                            .context("recover background job progress should not fail")?;
345                        let mut background_jobs = HashMap::new();
346                        for (definition, stream_job_fragments) in jobs {
347                            if stream_job_fragments
348                                .tracking_progress_actor_ids()
349                                .is_empty()
350                            {
351                                // If there's no tracking actor in the job, we can finish the job directly.
352                                self.metadata_manager
353                                    .catalog_controller
354                                    .finish_streaming_job(
355                                        stream_job_fragments.stream_job_id().table_id as _,
356                                    )
357                                    .await?;
358                            } else {
359                                background_jobs
360                                    .try_insert(
361                                        stream_job_fragments.stream_job_id(),
362                                        (definition, stream_job_fragments),
363                                    )
364                                    .expect("non-duplicate");
365                            }
366                        }
367                        background_jobs
368                    };
369
370                    tracing::info!("recovered background job progress");
371
372                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
373                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
374
375                    let mut active_streaming_nodes =
376                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
377                            .await?;
378
379                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
380                    info!(
381                        "background streaming jobs: {:?} total {}",
382                        background_streaming_jobs,
383                        background_streaming_jobs.len()
384                    );
385
386                    let unreschedulable_jobs = {
387                        let mut unreschedulable_jobs = HashSet::new();
388
389                        for job_id in background_streaming_jobs {
390                            let scan_types = self
391                                .metadata_manager
392                                .get_job_backfill_scan_types(&job_id)
393                                .await?;
394
395                            if scan_types
396                                .values()
397                                .any(|scan_type| !scan_type.is_reschedulable())
398                            {
399                                unreschedulable_jobs.insert(job_id);
400                            }
401                        }
402
403                        unreschedulable_jobs
404                    };
405
406                    if !unreschedulable_jobs.is_empty() {
407                        tracing::info!(
408                            "unreschedulable background jobs: {:?}",
409                            unreschedulable_jobs
410                        );
411                    }
412
413                    // Resolve actor info for recovery. If there's no actor to recover, most of the
414                    // following steps will be no-op, while the compute nodes will still be reset.
415                    // FIXME: Transactions should be used.
416                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
417                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
418                        && unreschedulable_jobs.is_empty()
419                    {
420                        info!("trigger offline scaling");
421                        self.scale_actors(&active_streaming_nodes)
422                            .await
423                            .inspect_err(|err| {
424                                warn!(error = %err.as_report(), "scale actors failed");
425                            })?;
426
427                        self.resolve_graph_info(None).await.inspect_err(|err| {
428                            warn!(error = %err.as_report(), "resolve actor info failed");
429                        })?
430                    } else {
431                        info!("trigger actor migration");
432                        // Migrate actors in expired CN to newly joined one.
433                        self.migrate_actors(&mut active_streaming_nodes)
434                            .await
435                            .inspect_err(|err| {
436                                warn!(error = %err.as_report(), "migrate actors failed");
437                            })?
438                    };
439
440                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
441                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
442                            warn!(error = %err.as_report(), "resolve actor info failed");
443                        })?
444                    }
445
446                    self.recovery_table_with_upstream_sinks(&mut info).await?;
447
448                    let info = info;
449
450                    self.purge_state_table_from_hummock(
451                        &info
452                            .values()
453                            .flatten()
454                            .flat_map(|(_, job)| job.existing_table_ids())
455                            .collect(),
456                    )
457                    .await
458                    .context("purge state table from hummock")?;
459
460                    let (state_table_committed_epochs, state_table_log_epochs) = self
461                        .hummock_manager
462                        .on_current_version(|version| {
463                            Self::resolve_hummock_version_epochs(&background_jobs, version)
464                        })
465                        .await?;
466
467                    let subscription_infos = self
468                        .metadata_manager
469                        .get_mv_depended_subscriptions(None)
470                        .await?
471                        .into_iter()
472                        .map(|(database_id, mv_depended_subscriptions)| {
473                            (
474                                database_id,
475                                InflightSubscriptionInfo {
476                                    mv_depended_subscriptions,
477                                },
478                            )
479                        })
480                        .collect();
481
482                    // update and build all actors.
483                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
484                        warn!(error = %err.as_report(), "update actors failed");
485                    })?;
486
487                    let fragment_relations = self
488                        .metadata_manager
489                        .catalog_controller
490                        .get_fragment_downstream_relations(
491                            info.values()
492                                .flatten()
493                                .flat_map(|(_, job)| job.fragment_infos())
494                                .map(|fragment| fragment.fragment_id as _)
495                                .collect(),
496                        )
497                        .await?;
498
499                    let background_jobs = {
500                        let jobs = self
501                            .list_background_job_progress()
502                            .await
503                            .context("recover background job progress should not fail")?;
504                        let mut background_jobs = HashMap::new();
505                        for (definition, stream_job_fragments) in jobs {
506                            background_jobs
507                                .try_insert(
508                                    stream_job_fragments.stream_job_id(),
509                                    (definition, stream_job_fragments),
510                                )
511                                .expect("non-duplicate");
512                        }
513                        background_jobs
514                    };
515
516                    let database_infos = self
517                        .metadata_manager
518                        .catalog_controller
519                        .list_databases()
520                        .await?;
521
522                    // get split assignments for all actors
523                    let mut source_splits = HashMap::new();
524                    for (_, job) in info.values().flatten() {
525                        for fragment in job.fragment_infos.values() {
526                            for (actor_id, info) in &fragment.actors {
527                                source_splits.insert(*actor_id, info.splits.clone());
528                            }
529                        }
530                    }
531
532                    let cdc_table_backfill_actors = self
533                        .metadata_manager
534                        .catalog_controller
535                        .cdc_table_backfill_actor_ids()
536                        .await?;
537                    let cdc_table_ids = cdc_table_backfill_actors
538                        .keys()
539                        .cloned()
540                        .collect::<Vec<_>>();
541                    let cdc_table_snapshot_split_assignment =
542                        assign_cdc_table_snapshot_splits_pairs(
543                            cdc_table_backfill_actors,
544                            self.env.meta_store_ref(),
545                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
546                        )
547                        .await?;
548                    let cdc_table_snapshot_split_assignment =
549                        if cdc_table_snapshot_split_assignment.is_empty() {
550                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
551                        } else {
552                            let generation = self
553                                .env
554                                .cdc_table_backfill_tracker
555                                .next_generation(cdc_table_ids.into_iter());
556                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
557                                cdc_table_snapshot_split_assignment,
558                                generation,
559                            )
560                        };
561                    Ok(BarrierWorkerRuntimeInfoSnapshot {
562                        active_streaming_nodes,
563                        database_job_infos: info,
564                        state_table_committed_epochs,
565                        state_table_log_epochs,
566                        subscription_infos,
567                        stream_actors,
568                        fragment_relations,
569                        source_splits,
570                        background_jobs,
571                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
572                        database_infos,
573                        cdc_table_snapshot_split_assignment,
574                    })
575                }
576            }
577        }
578    }
579
580    pub(super) async fn reload_database_runtime_info_impl(
581        &self,
582        database_id: DatabaseId,
583    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
584        self.clean_dirty_streaming_jobs(Some(database_id))
585            .await
586            .context("clean dirty streaming jobs")?;
587
588        // Background job progress needs to be recovered.
589        tracing::info!(
590            ?database_id,
591            "recovering background job progress of database"
592        );
593        let background_jobs = self
594            .list_background_job_progress()
595            .await
596            .context("recover background job progress of database should not fail")?;
597        tracing::info!(?database_id, "recovered background job progress");
598
599        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
600        let _ = self
601            .scheduled_barriers
602            .pre_apply_drop_cancel(Some(database_id));
603
604        let mut info = self
605            .resolve_graph_info(Some(database_id))
606            .await
607            .inspect_err(|err| {
608                warn!(error = %err.as_report(), "resolve actor info failed");
609            })?;
610
611        self.recovery_table_with_upstream_sinks(&mut info).await?;
612
613        assert!(info.len() <= 1);
614        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
615            assert_eq!(loaded_database_id, database_id);
616            info
617        }) else {
618            return Ok(None);
619        };
620
621        let background_jobs = {
622            let jobs = background_jobs;
623            let mut background_jobs = HashMap::new();
624            for (definition, stream_job_fragments) in jobs {
625                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
626                    continue;
627                }
628                if stream_job_fragments
629                    .tracking_progress_actor_ids()
630                    .is_empty()
631                {
632                    // If there's no tracking actor in the job, we can finish the job directly.
633                    self.metadata_manager
634                        .catalog_controller
635                        .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
636                        .await?;
637                } else {
638                    background_jobs
639                        .try_insert(
640                            stream_job_fragments.stream_job_id(),
641                            (definition, stream_job_fragments),
642                        )
643                        .expect("non-duplicate");
644                }
645            }
646            background_jobs
647        };
648
649        let (state_table_committed_epochs, state_table_log_epochs) = self
650            .hummock_manager
651            .on_current_version(|version| {
652                Self::resolve_hummock_version_epochs(&background_jobs, version)
653            })
654            .await?;
655
656        let subscription_infos = self
657            .metadata_manager
658            .get_mv_depended_subscriptions(Some(database_id))
659            .await?;
660        assert!(subscription_infos.len() <= 1);
661        let mv_depended_subscriptions = subscription_infos
662            .into_iter()
663            .next()
664            .map(|(loaded_database_id, subscriptions)| {
665                assert_eq!(loaded_database_id, database_id);
666                subscriptions
667            })
668            .unwrap_or_default();
669        let subscription_info = InflightSubscriptionInfo {
670            mv_depended_subscriptions,
671        };
672
673        let fragment_relations = self
674            .metadata_manager
675            .catalog_controller
676            .get_fragment_downstream_relations(
677                info.values()
678                    .flatten()
679                    .map(|fragment| fragment.fragment_id as _)
680                    .collect(),
681            )
682            .await?;
683
684        // update and build all actors.
685        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
686            warn!(error = %err.as_report(), "update actors failed");
687        })?;
688
689        // get split assignments for all actors
690        let mut source_splits = HashMap::new();
691        for fragment in info.values().flatten() {
692            for (actor_id, info) in &fragment.actors {
693                source_splits.insert(*actor_id, info.splits.clone());
694            }
695        }
696
697        let cdc_table_backfill_actors = self
698            .metadata_manager
699            .catalog_controller
700            .cdc_table_backfill_actor_ids()
701            .await?;
702        let cdc_table_ids = cdc_table_backfill_actors
703            .keys()
704            .cloned()
705            .collect::<Vec<_>>();
706        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
707            cdc_table_backfill_actors,
708            self.env.meta_store_ref(),
709            self.env.cdc_table_backfill_tracker.completed_job_ids(),
710        )
711        .await?;
712        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
713        {
714            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
715        } else {
716            CdcTableSnapshotSplitAssignmentWithGeneration::new(
717                cdc_table_snapshot_split_assignment,
718                self.env
719                    .cdc_table_backfill_tracker
720                    .next_generation(cdc_table_ids.into_iter()),
721            )
722        };
723        Ok(Some(DatabaseRuntimeInfoSnapshot {
724            job_infos: info,
725            state_table_committed_epochs,
726            state_table_log_epochs,
727            subscription_info,
728            stream_actors,
729            fragment_relations,
730            source_splits,
731            background_jobs,
732            cdc_table_snapshot_split_assignment,
733        }))
734    }
735}
736
737impl GlobalBarrierWorkerContextImpl {
738    // Migration timeout.
739    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
740
741    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
742    async fn migrate_actors(
743        &self,
744        active_nodes: &mut ActiveStreamingWorkerNodes,
745    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
746        let mgr = &self.metadata_manager;
747
748        // all worker slots used by actors
749        let all_inuse_worker_slots: HashSet<_> = mgr
750            .catalog_controller
751            .all_inuse_worker_slots()
752            .await?
753            .into_iter()
754            .collect();
755
756        let active_worker_slots: HashSet<_> = active_nodes
757            .current()
758            .values()
759            .flat_map(|node| {
760                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
761            })
762            .collect();
763
764        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
765            .difference(&active_worker_slots)
766            .cloned()
767            .collect();
768
769        if expired_worker_slots.is_empty() {
770            info!("no expired worker slots, skipping.");
771            return self.resolve_graph_info(None).await;
772        }
773
774        info!("start migrate actors.");
775        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
776        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
777
778        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
779            .intersection(&active_worker_slots)
780            .cloned()
781            .collect();
782
783        let start = Instant::now();
784        let mut plan = HashMap::new();
785        'discovery: while !to_migrate_worker_slots.is_empty() {
786            let mut new_worker_slots = active_nodes
787                .current()
788                .values()
789                .flat_map(|worker| {
790                    (0..worker.compute_node_parallelism())
791                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
792                })
793                .collect_vec();
794
795            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
796            let to_migration_size = to_migrate_worker_slots.len();
797            let mut available_size = new_worker_slots.len();
798
799            if available_size < to_migration_size
800                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
801            {
802                let mut factor = 2;
803
804                while available_size < to_migration_size {
805                    let mut extended_worker_slots = active_nodes
806                        .current()
807                        .values()
808                        .flat_map(|worker| {
809                            (0..worker.compute_node_parallelism() * factor)
810                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
811                        })
812                        .collect_vec();
813
814                    extended_worker_slots
815                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
816
817                    extended_worker_slots.sort_by(|a, b| {
818                        a.slot_idx()
819                            .cmp(&b.slot_idx())
820                            .then(a.worker_id().cmp(&b.worker_id()))
821                    });
822
823                    available_size = extended_worker_slots.len();
824                    new_worker_slots = extended_worker_slots;
825
826                    factor *= 2;
827                }
828
829                tracing::info!(
830                    "migration timed out, extending worker slots to {:?} by factor {}",
831                    new_worker_slots,
832                    factor,
833                );
834            }
835
836            if !new_worker_slots.is_empty() {
837                debug!("new worker slots found: {:#?}", new_worker_slots);
838                for target_worker_slot in new_worker_slots {
839                    if let Some(from) = to_migrate_worker_slots.pop() {
840                        debug!(
841                            "plan to migrate from worker slot {} to {}",
842                            from, target_worker_slot
843                        );
844                        inuse_worker_slots.insert(target_worker_slot);
845                        plan.insert(from, target_worker_slot);
846                    } else {
847                        break 'discovery;
848                    }
849                }
850            }
851
852            if to_migrate_worker_slots.is_empty() {
853                break;
854            }
855
856            // wait to get newly joined CN
857            let changed = active_nodes
858                .wait_changed(
859                    Duration::from_millis(5000),
860                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
861                    |active_nodes| {
862                        let current_nodes = active_nodes
863                            .current()
864                            .values()
865                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
866                            .collect_vec();
867                        warn!(
868                            current_nodes = ?current_nodes,
869                            "waiting for new workers to join, elapsed: {}s",
870                            start.elapsed().as_secs()
871                        );
872                    },
873                )
874                .await;
875            warn!(?changed, "get worker changed or timed out. Retry migrate");
876        }
877
878        info!("migration plan {:?}", plan);
879
880        mgr.catalog_controller.migrate_actors(plan).await?;
881
882        info!("migrate actors succeed.");
883
884        self.resolve_graph_info(None).await
885    }
886
887    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
888        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
889            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
890        };
891
892        match self.scale_controller.integrity_check().await {
893            Ok(_) => {
894                info!("integrity check passed");
895            }
896            Err(e) => {
897                return Err(anyhow!(e).context("integrity check failed").into());
898            }
899        }
900
901        let mgr = &self.metadata_manager;
902
903        debug!("start resetting actors distribution");
904
905        let available_workers: HashMap<_, _> = active_nodes
906            .current()
907            .values()
908            .filter(|worker| worker.is_streaming_schedulable())
909            .map(|worker| (worker.id, worker.clone()))
910            .collect();
911
912        info!(
913            "target worker ids for offline scaling: {:?}",
914            available_workers
915        );
916
917        let available_parallelism = active_nodes
918            .current()
919            .values()
920            .map(|worker_node| worker_node.compute_node_parallelism())
921            .sum();
922
923        let mut table_parallelisms = HashMap::new();
924
925        let reschedule_targets: HashMap<_, _> = {
926            let streaming_parallelisms = mgr
927                .catalog_controller
928                .get_all_streaming_parallelisms()
929                .await?;
930
931            let mut result = HashMap::new();
932
933            for (object_id, streaming_parallelism) in streaming_parallelisms {
934                let actual_fragment_parallelism = mgr
935                    .catalog_controller
936                    .get_actual_job_fragment_parallelism(object_id)
937                    .await?;
938
939                let table_parallelism = match streaming_parallelism {
940                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
941                    StreamingParallelism::Custom => model::TableParallelism::Custom,
942                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
943                };
944
945                let target_parallelism = Self::derive_target_parallelism(
946                    available_parallelism,
947                    table_parallelism,
948                    actual_fragment_parallelism,
949                    self.env.opts.default_parallelism,
950                );
951
952                if target_parallelism != table_parallelism {
953                    tracing::info!(
954                        "resetting table {} parallelism from {:?} to {:?}",
955                        object_id,
956                        table_parallelism,
957                        target_parallelism
958                    );
959                }
960
961                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
962
963                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
964
965                result.insert(
966                    object_id as u32,
967                    JobRescheduleTarget {
968                        parallelism: parallelism_change,
969                        resource_group: JobResourceGroupTarget::Keep,
970                    },
971                );
972            }
973
974            result
975        };
976
977        info!(
978            "target table parallelisms for offline scaling: {:?}",
979            reschedule_targets
980        );
981
982        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
983
984        for chunk in reschedule_targets
985            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
986            .map(|c| c.to_vec())
987        {
988            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
989
990            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
991
992            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
993
994            let plan = self
995                .scale_controller
996                .generate_job_reschedule_plan(
997                    JobReschedulePolicy {
998                        targets: local_reschedule_targets,
999                    },
1000                    false,
1001                )
1002                .await?;
1003
1004            // no need to update
1005            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
1006                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
1007                continue;
1008            };
1009
1010            let mut compared_table_parallelisms = table_parallelisms.clone();
1011
1012            // skip reschedule if no reschedule is generated.
1013            let reschedule_fragment = if plan.reschedules.is_empty() {
1014                HashMap::new()
1015            } else {
1016                self.scale_controller
1017                    .analyze_reschedule_plan(
1018                        plan.reschedules,
1019                        RescheduleOptions {
1020                            resolve_no_shuffle_upstream: true,
1021                            skip_create_new_actors: true,
1022                        },
1023                        &mut compared_table_parallelisms,
1024                    )
1025                    .await?
1026            };
1027
1028            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
1029            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
1030
1031            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
1032
1033            if let Err(e) = self
1034                .scale_controller
1035                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
1036                .await
1037            {
1038                tracing::error!(
1039                    error = %e.as_report(),
1040                    "failed to apply reschedule for offline scaling in recovery",
1041                );
1042
1043                return Err(e);
1044            }
1045
1046            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
1047        }
1048
1049        info!("scaling actors succeed.");
1050        Ok(())
1051    }
1052
1053    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
1054    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
1055    // For Custom, we'll assess the parallelism of the core fragment;
1056    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
1057    // If it's lower, we'll set it to Fixed.
1058    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
1059    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
1060    fn derive_target_parallelism(
1061        available_parallelism: usize,
1062        assigned_parallelism: TableParallelism,
1063        actual_fragment_parallelism: Option<usize>,
1064        default_parallelism: DefaultParallelism,
1065    ) -> TableParallelism {
1066        match assigned_parallelism {
1067            TableParallelism::Custom => {
1068                if let Some(fragment_parallelism) = actual_fragment_parallelism {
1069                    if fragment_parallelism >= available_parallelism {
1070                        TableParallelism::Adaptive
1071                    } else {
1072                        TableParallelism::Fixed(fragment_parallelism)
1073                    }
1074                } else {
1075                    TableParallelism::Adaptive
1076                }
1077            }
1078            TableParallelism::Adaptive => {
1079                match (default_parallelism, actual_fragment_parallelism) {
1080                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
1081                        if fragment_parallelism == n.get() =>
1082                    {
1083                        TableParallelism::Fixed(fragment_parallelism)
1084                    }
1085                    _ => TableParallelism::Adaptive,
1086                }
1087            }
1088            _ => assigned_parallelism,
1089        }
1090    }
1091
1092    /// Update all actors in compute nodes.
1093    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
1094        self.metadata_manager.all_active_actors().await
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use std::num::NonZeroUsize;
1101
1102    use super::*;
1103    #[test]
1104    fn test_derive_target_parallelism() {
1105        // total 10, assigned custom, actual 5, default full -> fixed(5)
1106        assert_eq!(
1107            TableParallelism::Fixed(5),
1108            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1109                10,
1110                TableParallelism::Custom,
1111                Some(5),
1112                DefaultParallelism::Full,
1113            )
1114        );
1115
1116        // total 10, assigned custom, actual 10, default full -> adaptive
1117        assert_eq!(
1118            TableParallelism::Adaptive,
1119            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1120                10,
1121                TableParallelism::Custom,
1122                Some(10),
1123                DefaultParallelism::Full,
1124            )
1125        );
1126
1127        // total 10, assigned custom, actual 11, default full -> adaptive
1128        assert_eq!(
1129            TableParallelism::Adaptive,
1130            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1131                10,
1132                TableParallelism::Custom,
1133                Some(11),
1134                DefaultParallelism::Full,
1135            )
1136        );
1137
1138        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
1139        assert_eq!(
1140            TableParallelism::Adaptive,
1141            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1142                10,
1143                TableParallelism::Custom,
1144                None,
1145                DefaultParallelism::Full,
1146            )
1147        );
1148
1149        // total 10, assigned adaptive, actual _, default full -> adaptive
1150        assert_eq!(
1151            TableParallelism::Adaptive,
1152            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1153                10,
1154                TableParallelism::Adaptive,
1155                None,
1156                DefaultParallelism::Full,
1157            )
1158        );
1159
1160        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
1161        assert_eq!(
1162            TableParallelism::Fixed(5),
1163            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1164                10,
1165                TableParallelism::Adaptive,
1166                Some(5),
1167                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1168            )
1169        );
1170
1171        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
1172        assert_eq!(
1173            TableParallelism::Adaptive,
1174            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1175                10,
1176                TableParallelism::Adaptive,
1177                Some(6),
1178                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1179            )
1180        );
1181    }
1182}