risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::{
39    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
40    RescheduleOptions, SourceChange, StreamFragmentGraph,
41};
42use crate::{MetaResult, model};
43
44impl GlobalBarrierWorkerContextImpl {
45    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
46    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47        let database_id = database_id.map(|database_id| database_id.database_id as _);
48        self.metadata_manager
49            .catalog_controller
50            .clean_dirty_subscription(database_id)
51            .await?;
52        let dirty_associated_source_ids = self
53            .metadata_manager
54            .catalog_controller
55            .clean_dirty_creating_jobs(database_id)
56            .await?;
57
58        // unregister cleaned sources.
59        self.source_manager
60            .apply_source_change(SourceChange::DropSource {
61                dropped_source_ids: dirty_associated_source_ids,
62            })
63            .await;
64
65        Ok(())
66    }
67
68    async fn purge_state_table_from_hummock(
69        &self,
70        all_state_table_ids: &HashSet<TableId>,
71    ) -> MetaResult<()> {
72        self.hummock_manager.purge(all_state_table_ids).await?;
73        Ok(())
74    }
75
76    async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
77        let mgr = &self.metadata_manager;
78        let mviews = mgr
79            .catalog_controller
80            .list_background_creating_mviews(false)
81            .await?;
82
83        try_join_all(mviews.into_iter().map(|mview| async move {
84            let table_id = TableId::new(mview.table_id as _);
85            let stream_job_fragments = mgr
86                .catalog_controller
87                .get_job_fragments_by_id(mview.table_id)
88                .await?;
89            assert_eq!(stream_job_fragments.stream_job_id(), table_id);
90            Ok((mview.definition, stream_job_fragments))
91        }))
92        .await
93        // If failed, enter recovery mode.
94    }
95
96    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
97    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
98    /// will create or drop before this barrier flow through them.
99    async fn resolve_graph_info(
100        &self,
101        database_id: Option<DatabaseId>,
102    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
103        let database_id = database_id.map(|database_id| database_id.database_id as _);
104        let all_actor_infos = self
105            .metadata_manager
106            .catalog_controller
107            .load_all_actors(database_id)
108            .await?;
109
110        Ok(all_actor_infos
111            .into_iter()
112            .map(|(loaded_database_id, job_fragment_infos)| {
113                if let Some(database_id) = database_id {
114                    assert_eq!(database_id, loaded_database_id);
115                }
116                (
117                    DatabaseId::new(loaded_database_id as _),
118                    job_fragment_infos
119                        .into_iter()
120                        .map(|(job_id, fragment_infos)| {
121                            let job_id = TableId::new(job_id as _);
122                            (
123                                job_id,
124                                InflightStreamingJobInfo {
125                                    job_id,
126                                    fragment_infos: fragment_infos
127                                        .into_iter()
128                                        .map(|(fragment_id, info)| (fragment_id as _, info))
129                                        .collect(),
130                                },
131                            )
132                        })
133                        .collect(),
134                )
135            })
136            .collect())
137    }
138
139    #[expect(clippy::type_complexity)]
140    fn resolve_hummock_version_epochs(
141        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
142        version: &HummockVersion,
143    ) -> MetaResult<(
144        HashMap<TableId, u64>,
145        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
146    )> {
147        let table_committed_epoch: HashMap<_, _> = version
148            .state_table_info
149            .info()
150            .iter()
151            .map(|(table_id, info)| (*table_id, info.committed_epoch))
152            .collect();
153        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
154            Ok(*table_committed_epoch
155                .get(&table_id)
156                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
157        };
158        let mut min_downstream_committed_epochs = HashMap::new();
159        for (_, job) in background_jobs.values() {
160            let job_committed_epoch = get_table_committed_epoch(job.stream_job_id)?;
161            if let (Some(snapshot_backfill_info), _) =
162                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
163                    job.fragments()
164                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
165                )?
166            {
167                for (upstream_table, snapshot_epoch) in
168                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
169                {
170                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
171                        anyhow!(
172                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
173                            job
174                        )
175                    })?;
176                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
177                    match min_downstream_committed_epochs.entry(upstream_table) {
178                        Entry::Occupied(entry) => {
179                            let prev_min_epoch = entry.into_mut();
180                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
181                        }
182                        Entry::Vacant(entry) => {
183                            entry.insert(pinned_epoch);
184                        }
185                    }
186                }
187            }
188        }
189        let mut log_epochs = HashMap::new();
190        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
191            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
192            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
193                Ordering::Less => {
194                    return Err(anyhow!(
195                        "downstream epoch {} later than upstream epoch {} of table {}",
196                        downstream_committed_epoch,
197                        upstream_committed_epoch,
198                        upstream_table_id
199                    )
200                    .into());
201                }
202                Ordering::Equal => {
203                    continue;
204                }
205                Ordering::Greater => {
206                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
207                    {
208                        let epochs = table_change_log
209                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
210                            .map(|epoch_log| {
211                                (
212                                    epoch_log.non_checkpoint_epochs.clone(),
213                                    epoch_log.checkpoint_epoch,
214                                )
215                            })
216                            .collect_vec();
217                        let first_epochs = epochs.first();
218                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
219                            && *first_checkpoint_epoch == downstream_committed_epoch
220                        {
221                        } else {
222                            return Err(anyhow!(
223                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
224                                epochs, upstream_table_id, downstream_committed_epoch).into()
225                            );
226                        }
227                        log_epochs
228                            .try_insert(upstream_table_id, epochs)
229                            .expect("non-duplicated");
230                    } else {
231                        return Err(anyhow!(
232                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
233                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
234                        );
235                    }
236                }
237            }
238        }
239        Ok((table_committed_epoch, log_epochs))
240    }
241
242    pub(super) async fn reload_runtime_info_impl(
243        &self,
244    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
245        {
246            {
247                {
248                    self.clean_dirty_streaming_jobs(None)
249                        .await
250                        .context("clean dirty streaming jobs")?;
251
252                    // Mview progress needs to be recovered.
253                    tracing::info!("recovering mview progress");
254                    let background_jobs = {
255                        let jobs = self
256                            .list_background_mv_progress()
257                            .await
258                            .context("recover mview progress should not fail")?;
259                        let mut background_jobs = HashMap::new();
260                        for (definition, stream_job_fragments) in jobs {
261                            if stream_job_fragments
262                                .tracking_progress_actor_ids()
263                                .is_empty()
264                            {
265                                // If there's no tracking actor in the mview, we can finish the job directly.
266                                self.metadata_manager
267                                    .catalog_controller
268                                    .finish_streaming_job(
269                                        stream_job_fragments.stream_job_id().table_id as _,
270                                        None,
271                                    )
272                                    .await?;
273                            } else {
274                                background_jobs
275                                    .try_insert(
276                                        stream_job_fragments.stream_job_id(),
277                                        (definition, stream_job_fragments),
278                                    )
279                                    .expect("non-duplicate");
280                            }
281                        }
282                        background_jobs
283                    };
284
285                    tracing::info!("recovered mview progress");
286
287                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
288                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
289
290                    let mut active_streaming_nodes =
291                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
292                            .await?;
293
294                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
295                    info!(
296                        "background streaming jobs: {:?} total {}",
297                        background_streaming_jobs,
298                        background_streaming_jobs.len()
299                    );
300
301                    let unreschedulable_jobs = {
302                        let mut unreschedulable_jobs = HashSet::new();
303
304                        for job_id in background_streaming_jobs {
305                            let scan_types = self
306                                .metadata_manager
307                                .get_job_backfill_scan_types(&job_id)
308                                .await?;
309
310                            if scan_types
311                                .values()
312                                .any(|scan_type| !scan_type.is_reschedulable())
313                            {
314                                unreschedulable_jobs.insert(job_id);
315                            }
316                        }
317
318                        unreschedulable_jobs
319                    };
320
321                    if !unreschedulable_jobs.is_empty() {
322                        tracing::info!(
323                            "unreschedulable background jobs: {:?}",
324                            unreschedulable_jobs
325                        );
326                    }
327
328                    // Resolve actor info for recovery. If there's no actor to recover, most of the
329                    // following steps will be no-op, while the compute nodes will still be reset.
330                    // FIXME: Transactions should be used.
331                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
332                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
333                        && unreschedulable_jobs.is_empty()
334                    {
335                        info!("trigger offline scaling");
336                        self.scale_actors(&active_streaming_nodes)
337                            .await
338                            .inspect_err(|err| {
339                                warn!(error = %err.as_report(), "scale actors failed");
340                            })?;
341
342                        self.resolve_graph_info(None).await.inspect_err(|err| {
343                            warn!(error = %err.as_report(), "resolve actor info failed");
344                        })?
345                    } else {
346                        info!("trigger actor migration");
347                        // Migrate actors in expired CN to newly joined one.
348                        self.migrate_actors(&mut active_streaming_nodes)
349                            .await
350                            .inspect_err(|err| {
351                                warn!(error = %err.as_report(), "migrate actors failed");
352                            })?
353                    };
354
355                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
356                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
357                            warn!(error = %err.as_report(), "resolve actor info failed");
358                        })?
359                    }
360
361                    let info = info;
362
363                    self.purge_state_table_from_hummock(
364                        &info
365                            .values()
366                            .flatten()
367                            .flat_map(|(_, job)| job.existing_table_ids())
368                            .collect(),
369                    )
370                    .await
371                    .context("purge state table from hummock")?;
372
373                    let (state_table_committed_epochs, state_table_log_epochs) = self
374                        .hummock_manager
375                        .on_current_version(|version| {
376                            Self::resolve_hummock_version_epochs(&background_jobs, version)
377                        })
378                        .await?;
379
380                    let subscription_infos = self
381                        .metadata_manager
382                        .get_mv_depended_subscriptions(None)
383                        .await?
384                        .into_iter()
385                        .map(|(database_id, mv_depended_subscriptions)| {
386                            (
387                                database_id,
388                                InflightSubscriptionInfo {
389                                    mv_depended_subscriptions,
390                                },
391                            )
392                        })
393                        .collect();
394
395                    // update and build all actors.
396                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
397                        warn!(error = %err.as_report(), "update actors failed");
398                    })?;
399
400                    let fragment_relations = self
401                        .metadata_manager
402                        .catalog_controller
403                        .get_fragment_downstream_relations(
404                            info.values()
405                                .flatten()
406                                .flat_map(|(_, job)| job.fragment_infos())
407                                .map(|fragment| fragment.fragment_id as _)
408                                .collect(),
409                        )
410                        .await?;
411
412                    let background_jobs = {
413                        let jobs = self
414                            .list_background_mv_progress()
415                            .await
416                            .context("recover mview progress should not fail")?;
417                        let mut background_jobs = HashMap::new();
418                        for (definition, stream_job_fragments) in jobs {
419                            background_jobs
420                                .try_insert(
421                                    stream_job_fragments.stream_job_id(),
422                                    (definition, stream_job_fragments),
423                                )
424                                .expect("non-duplicate");
425                        }
426                        background_jobs
427                    };
428
429                    // get split assignments for all actors
430                    let source_splits = self.source_manager.list_assignments().await;
431                    Ok(BarrierWorkerRuntimeInfoSnapshot {
432                        active_streaming_nodes,
433                        database_job_infos: info,
434                        state_table_committed_epochs,
435                        state_table_log_epochs,
436                        subscription_infos,
437                        stream_actors,
438                        fragment_relations,
439                        source_splits,
440                        background_jobs,
441                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
442                    })
443                }
444            }
445        }
446    }
447
448    pub(super) async fn reload_database_runtime_info_impl(
449        &self,
450        database_id: DatabaseId,
451    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
452        self.clean_dirty_streaming_jobs(Some(database_id))
453            .await
454            .context("clean dirty streaming jobs")?;
455
456        // Mview progress needs to be recovered.
457        tracing::info!(?database_id, "recovering mview progress of database");
458        let background_jobs = self
459            .list_background_mv_progress()
460            .await
461            .context("recover mview progress of database should not fail")?;
462        tracing::info!(?database_id, "recovered mview progress");
463
464        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
465        let _ = self
466            .scheduled_barriers
467            .pre_apply_drop_cancel(Some(database_id));
468
469        let info = self
470            .resolve_graph_info(Some(database_id))
471            .await
472            .inspect_err(|err| {
473                warn!(error = %err.as_report(), "resolve actor info failed");
474            })?;
475        assert!(info.len() <= 1);
476        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
477            assert_eq!(loaded_database_id, database_id);
478            info
479        }) else {
480            return Ok(None);
481        };
482
483        let background_jobs = {
484            let jobs = background_jobs;
485            let mut background_jobs = HashMap::new();
486            for (definition, stream_job_fragments) in jobs {
487                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
488                    continue;
489                }
490                if stream_job_fragments
491                    .tracking_progress_actor_ids()
492                    .is_empty()
493                {
494                    // If there's no tracking actor in the mview, we can finish the job directly.
495                    self.metadata_manager
496                        .catalog_controller
497                        .finish_streaming_job(
498                            stream_job_fragments.stream_job_id().table_id as _,
499                            None,
500                        )
501                        .await?;
502                } else {
503                    background_jobs
504                        .try_insert(
505                            stream_job_fragments.stream_job_id(),
506                            (definition, stream_job_fragments),
507                        )
508                        .expect("non-duplicate");
509                }
510            }
511            background_jobs
512        };
513
514        let (state_table_committed_epochs, state_table_log_epochs) = self
515            .hummock_manager
516            .on_current_version(|version| {
517                Self::resolve_hummock_version_epochs(&background_jobs, version)
518            })
519            .await?;
520
521        let subscription_infos = self
522            .metadata_manager
523            .get_mv_depended_subscriptions(Some(database_id))
524            .await?;
525        assert!(subscription_infos.len() <= 1);
526        let mv_depended_subscriptions = subscription_infos
527            .into_iter()
528            .next()
529            .map(|(loaded_database_id, subscriptions)| {
530                assert_eq!(loaded_database_id, database_id);
531                subscriptions
532            })
533            .unwrap_or_default();
534        let subscription_info = InflightSubscriptionInfo {
535            mv_depended_subscriptions,
536        };
537
538        let fragment_relations = self
539            .metadata_manager
540            .catalog_controller
541            .get_fragment_downstream_relations(
542                info.values()
543                    .flatten()
544                    .map(|fragment| fragment.fragment_id as _)
545                    .collect(),
546            )
547            .await?;
548
549        // update and build all actors.
550        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
551            warn!(error = %err.as_report(), "update actors failed");
552        })?;
553
554        // get split assignments for all actors
555        let source_splits = self.source_manager.list_assignments().await;
556        Ok(Some(DatabaseRuntimeInfoSnapshot {
557            job_infos: info,
558            state_table_committed_epochs,
559            state_table_log_epochs,
560            subscription_info,
561            stream_actors,
562            fragment_relations,
563            source_splits,
564            background_jobs,
565        }))
566    }
567}
568
569impl GlobalBarrierWorkerContextImpl {
570    // Migration timeout.
571    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
572
573    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
574    async fn migrate_actors(
575        &self,
576        active_nodes: &mut ActiveStreamingWorkerNodes,
577    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
578        let mgr = &self.metadata_manager;
579
580        // all worker slots used by actors
581        let all_inuse_worker_slots: HashSet<_> = mgr
582            .catalog_controller
583            .all_inuse_worker_slots()
584            .await?
585            .into_iter()
586            .collect();
587
588        let active_worker_slots: HashSet<_> = active_nodes
589            .current()
590            .values()
591            .flat_map(|node| {
592                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
593            })
594            .collect();
595
596        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
597            .difference(&active_worker_slots)
598            .cloned()
599            .collect();
600
601        if expired_worker_slots.is_empty() {
602            info!("no expired worker slots, skipping.");
603            return self.resolve_graph_info(None).await;
604        }
605
606        info!("start migrate actors.");
607        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
608        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
609
610        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
611            .intersection(&active_worker_slots)
612            .cloned()
613            .collect();
614
615        let start = Instant::now();
616        let mut plan = HashMap::new();
617        'discovery: while !to_migrate_worker_slots.is_empty() {
618            let mut new_worker_slots = active_nodes
619                .current()
620                .values()
621                .flat_map(|worker| {
622                    (0..worker.compute_node_parallelism())
623                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
624                })
625                .collect_vec();
626
627            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
628            let to_migration_size = to_migrate_worker_slots.len();
629            let mut available_size = new_worker_slots.len();
630
631            if available_size < to_migration_size
632                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
633            {
634                let mut factor = 2;
635
636                while available_size < to_migration_size {
637                    let mut extended_worker_slots = active_nodes
638                        .current()
639                        .values()
640                        .flat_map(|worker| {
641                            (0..worker.compute_node_parallelism() * factor)
642                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
643                        })
644                        .collect_vec();
645
646                    extended_worker_slots
647                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
648
649                    extended_worker_slots.sort_by(|a, b| {
650                        a.slot_idx()
651                            .cmp(&b.slot_idx())
652                            .then(a.worker_id().cmp(&b.worker_id()))
653                    });
654
655                    available_size = extended_worker_slots.len();
656                    new_worker_slots = extended_worker_slots;
657
658                    factor *= 2;
659                }
660
661                tracing::info!(
662                    "migration timed out, extending worker slots to {:?} by factor {}",
663                    new_worker_slots,
664                    factor,
665                );
666            }
667
668            if !new_worker_slots.is_empty() {
669                debug!("new worker slots found: {:#?}", new_worker_slots);
670                for target_worker_slot in new_worker_slots {
671                    if let Some(from) = to_migrate_worker_slots.pop() {
672                        debug!(
673                            "plan to migrate from worker slot {} to {}",
674                            from, target_worker_slot
675                        );
676                        inuse_worker_slots.insert(target_worker_slot);
677                        plan.insert(from, target_worker_slot);
678                    } else {
679                        break 'discovery;
680                    }
681                }
682            }
683
684            if to_migrate_worker_slots.is_empty() {
685                break;
686            }
687
688            // wait to get newly joined CN
689            let changed = active_nodes
690                .wait_changed(
691                    Duration::from_millis(5000),
692                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
693                    |active_nodes| {
694                        let current_nodes = active_nodes
695                            .current()
696                            .values()
697                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
698                            .collect_vec();
699                        warn!(
700                            current_nodes = ?current_nodes,
701                            "waiting for new workers to join, elapsed: {}s",
702                            start.elapsed().as_secs()
703                        );
704                    },
705                )
706                .await;
707            warn!(?changed, "get worker changed or timed out. Retry migrate");
708        }
709
710        info!("migration plan {:?}", plan);
711
712        mgr.catalog_controller.migrate_actors(plan).await?;
713
714        info!("migrate actors succeed.");
715
716        self.resolve_graph_info(None).await
717    }
718
719    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
720        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
721            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
722        };
723
724        match self.scale_controller.integrity_check().await {
725            Ok(_) => {
726                info!("integrity check passed");
727            }
728            Err(e) => {
729                return Err(anyhow!(e).context("integrity check failed").into());
730            }
731        }
732
733        let mgr = &self.metadata_manager;
734
735        debug!("start resetting actors distribution");
736
737        let available_workers: HashMap<_, _> = active_nodes
738            .current()
739            .values()
740            .filter(|worker| worker.is_streaming_schedulable())
741            .map(|worker| (worker.id, worker.clone()))
742            .collect();
743
744        info!(
745            "target worker ids for offline scaling: {:?}",
746            available_workers
747        );
748
749        let available_parallelism = active_nodes
750            .current()
751            .values()
752            .map(|worker_node| worker_node.compute_node_parallelism())
753            .sum();
754
755        let mut table_parallelisms = HashMap::new();
756
757        let reschedule_targets: HashMap<_, _> = {
758            let streaming_parallelisms = mgr
759                .catalog_controller
760                .get_all_streaming_parallelisms()
761                .await?;
762
763            let mut result = HashMap::new();
764
765            for (object_id, streaming_parallelism) in streaming_parallelisms {
766                let actual_fragment_parallelism = mgr
767                    .catalog_controller
768                    .get_actual_job_fragment_parallelism(object_id)
769                    .await?;
770
771                let table_parallelism = match streaming_parallelism {
772                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
773                    StreamingParallelism::Custom => model::TableParallelism::Custom,
774                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
775                };
776
777                let target_parallelism = Self::derive_target_parallelism(
778                    available_parallelism,
779                    table_parallelism,
780                    actual_fragment_parallelism,
781                    self.env.opts.default_parallelism,
782                );
783
784                if target_parallelism != table_parallelism {
785                    tracing::info!(
786                        "resetting table {} parallelism from {:?} to {:?}",
787                        object_id,
788                        table_parallelism,
789                        target_parallelism
790                    );
791                }
792
793                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
794
795                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
796
797                result.insert(
798                    object_id as u32,
799                    JobRescheduleTarget {
800                        parallelism: parallelism_change,
801                        resource_group: JobResourceGroupTarget::Keep,
802                    },
803                );
804            }
805
806            result
807        };
808
809        info!(
810            "target table parallelisms for offline scaling: {:?}",
811            reschedule_targets
812        );
813
814        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
815
816        for chunk in reschedule_targets
817            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
818            .map(|c| c.to_vec())
819        {
820            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
821
822            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
823
824            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
825
826            let plan = self
827                .scale_controller
828                .generate_job_reschedule_plan(JobReschedulePolicy {
829                    targets: local_reschedule_targets,
830                })
831                .await?;
832
833            // no need to update
834            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
835                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
836                continue;
837            };
838
839            let mut compared_table_parallelisms = table_parallelisms.clone();
840
841            // skip reschedule if no reschedule is generated.
842            let reschedule_fragment = if plan.reschedules.is_empty() {
843                HashMap::new()
844            } else {
845                self.scale_controller
846                    .analyze_reschedule_plan(
847                        plan.reschedules,
848                        RescheduleOptions {
849                            resolve_no_shuffle_upstream: true,
850                            skip_create_new_actors: true,
851                        },
852                        &mut compared_table_parallelisms,
853                    )
854                    .await?
855            };
856
857            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
858            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
859
860            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
861
862            if let Err(e) = self
863                .scale_controller
864                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
865                .await
866            {
867                tracing::error!(
868                    error = %e.as_report(),
869                    "failed to apply reschedule for offline scaling in recovery",
870                );
871
872                return Err(e);
873            }
874
875            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
876        }
877
878        info!("scaling actors succeed.");
879        Ok(())
880    }
881
882    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
883    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
884    // For Custom, we'll assess the parallelism of the core fragment;
885    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
886    // If it's lower, we'll set it to Fixed.
887    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
888    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
889    fn derive_target_parallelism(
890        available_parallelism: usize,
891        assigned_parallelism: TableParallelism,
892        actual_fragment_parallelism: Option<usize>,
893        default_parallelism: DefaultParallelism,
894    ) -> TableParallelism {
895        match assigned_parallelism {
896            TableParallelism::Custom => {
897                if let Some(fragment_parallelism) = actual_fragment_parallelism {
898                    if fragment_parallelism >= available_parallelism {
899                        TableParallelism::Adaptive
900                    } else {
901                        TableParallelism::Fixed(fragment_parallelism)
902                    }
903                } else {
904                    TableParallelism::Adaptive
905                }
906            }
907            TableParallelism::Adaptive => {
908                match (default_parallelism, actual_fragment_parallelism) {
909                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
910                        if fragment_parallelism == n.get() =>
911                    {
912                        TableParallelism::Fixed(fragment_parallelism)
913                    }
914                    _ => TableParallelism::Adaptive,
915                }
916            }
917            _ => assigned_parallelism,
918        }
919    }
920
921    /// Update all actors in compute nodes.
922    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
923        self.metadata_manager.all_active_actors().await
924    }
925}
926
927#[cfg(test)]
928mod tests {
929    use std::num::NonZeroUsize;
930
931    use super::*;
932    #[test]
933    fn test_derive_target_parallelism() {
934        // total 10, assigned custom, actual 5, default full -> fixed(5)
935        assert_eq!(
936            TableParallelism::Fixed(5),
937            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
938                10,
939                TableParallelism::Custom,
940                Some(5),
941                DefaultParallelism::Full,
942            )
943        );
944
945        // total 10, assigned custom, actual 10, default full -> adaptive
946        assert_eq!(
947            TableParallelism::Adaptive,
948            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
949                10,
950                TableParallelism::Custom,
951                Some(10),
952                DefaultParallelism::Full,
953            )
954        );
955
956        // total 10, assigned custom, actual 11, default full -> adaptive
957        assert_eq!(
958            TableParallelism::Adaptive,
959            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
960                10,
961                TableParallelism::Custom,
962                Some(11),
963                DefaultParallelism::Full,
964            )
965        );
966
967        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
968        assert_eq!(
969            TableParallelism::Adaptive,
970            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
971                10,
972                TableParallelism::Custom,
973                None,
974                DefaultParallelism::Full,
975            )
976        );
977
978        // total 10, assigned adaptive, actual _, default full -> adaptive
979        assert_eq!(
980            TableParallelism::Adaptive,
981            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
982                10,
983                TableParallelism::Adaptive,
984                None,
985                DefaultParallelism::Full,
986            )
987        );
988
989        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
990        assert_eq!(
991            TableParallelism::Fixed(5),
992            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
993                10,
994                TableParallelism::Adaptive,
995                Some(5),
996                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
997            )
998        );
999
1000        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
1001        assert_eq!(
1002            TableParallelism::Adaptive,
1003            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1004                10,
1005                TableParallelism::Adaptive,
1006                Some(6),
1007                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1008            )
1009        );
1010    }
1011}