risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::{
39    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
40    RescheduleOptions, SourceChange, StreamFragmentGraph,
41};
42use crate::{MetaResult, model};
43
44impl GlobalBarrierWorkerContextImpl {
45    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
46    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47        let database_id = database_id.map(|database_id| database_id.database_id as _);
48        self.metadata_manager
49            .catalog_controller
50            .clean_dirty_subscription(database_id)
51            .await?;
52        let dirty_associated_source_ids = self
53            .metadata_manager
54            .catalog_controller
55            .clean_dirty_creating_jobs(database_id)
56            .await?;
57
58        // unregister cleaned sources.
59        self.source_manager
60            .apply_source_change(SourceChange::DropSource {
61                dropped_source_ids: dirty_associated_source_ids,
62            })
63            .await;
64
65        Ok(())
66    }
67
68    async fn purge_state_table_from_hummock(
69        &self,
70        all_state_table_ids: &HashSet<TableId>,
71    ) -> MetaResult<()> {
72        self.hummock_manager.purge(all_state_table_ids).await?;
73        Ok(())
74    }
75
76    async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
77        let mgr = &self.metadata_manager;
78        let mviews = mgr
79            .catalog_controller
80            .list_background_creating_mviews(false)
81            .await?;
82
83        try_join_all(mviews.into_iter().map(|mview| async move {
84            let table_id = TableId::new(mview.table_id as _);
85            let stream_job_fragments = mgr
86                .catalog_controller
87                .get_job_fragments_by_id(mview.table_id)
88                .await?;
89            assert_eq!(stream_job_fragments.stream_job_id(), table_id);
90            Ok((mview.definition, stream_job_fragments))
91        }))
92        .await
93        // If failed, enter recovery mode.
94    }
95
96    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
97    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
98    /// will create or drop before this barrier flow through them.
99    async fn resolve_graph_info(
100        &self,
101        database_id: Option<DatabaseId>,
102    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
103        let database_id = database_id.map(|database_id| database_id.database_id as _);
104        let all_actor_infos = self
105            .metadata_manager
106            .catalog_controller
107            .load_all_actors(database_id)
108            .await?;
109
110        Ok(all_actor_infos
111            .into_iter()
112            .map(|(loaded_database_id, job_fragment_infos)| {
113                if let Some(database_id) = database_id {
114                    assert_eq!(database_id, loaded_database_id);
115                }
116                (
117                    DatabaseId::new(loaded_database_id as _),
118                    job_fragment_infos
119                        .into_iter()
120                        .map(|(job_id, fragment_infos)| {
121                            let job_id = TableId::new(job_id as _);
122                            (
123                                job_id,
124                                InflightStreamingJobInfo {
125                                    job_id,
126                                    fragment_infos: fragment_infos
127                                        .into_iter()
128                                        .map(|(fragment_id, info)| (fragment_id as _, info))
129                                        .collect(),
130                                },
131                            )
132                        })
133                        .collect(),
134                )
135            })
136            .collect())
137    }
138
139    #[expect(clippy::type_complexity)]
140    fn resolve_hummock_version_epochs(
141        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
142        version: &HummockVersion,
143    ) -> MetaResult<(HashMap<TableId, u64>, HashMap<TableId, Vec<Vec<u64>>>)> {
144        let table_committed_epoch: HashMap<_, _> = version
145            .state_table_info
146            .info()
147            .iter()
148            .map(|(table_id, info)| (*table_id, info.committed_epoch))
149            .collect();
150        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
151            Ok(*table_committed_epoch
152                .get(&table_id)
153                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
154        };
155        let mut min_downstream_committed_epochs = HashMap::new();
156        for (_, job) in background_jobs.values() {
157            let job_committed_epoch = get_table_committed_epoch(job.stream_job_id)?;
158            if let (Some(snapshot_backfill_info), _) =
159                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
160                    job.fragments()
161                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
162                )?
163            {
164                for (upstream_table, snapshot_epoch) in
165                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
166                {
167                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
168                        anyhow!(
169                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
170                            job
171                        )
172                    })?;
173                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
174                    match min_downstream_committed_epochs.entry(upstream_table) {
175                        Entry::Occupied(entry) => {
176                            let prev_min_epoch = entry.into_mut();
177                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
178                        }
179                        Entry::Vacant(entry) => {
180                            entry.insert(pinned_epoch);
181                        }
182                    }
183                }
184            }
185        }
186        let mut log_epochs = HashMap::new();
187        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
188            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
189            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
190                Ordering::Less => {
191                    return Err(anyhow!(
192                        "downstream epoch {} later than upstream epoch {} of table {}",
193                        downstream_committed_epoch,
194                        upstream_committed_epoch,
195                        upstream_table_id
196                    )
197                    .into());
198                }
199                Ordering::Equal => {
200                    continue;
201                }
202                Ordering::Greater => {
203                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
204                    {
205                        let epochs = table_change_log
206                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
207                            .map(|epoch_log| epoch_log.epochs.clone())
208                            .collect_vec();
209                        let first_epochs = epochs.first();
210                        if let Some(first_epochs) = &first_epochs
211                            && first_epochs.last() == Some(&downstream_committed_epoch)
212                        {
213                        } else {
214                            return Err(anyhow!(
215                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
216                                epochs, upstream_table_id, downstream_committed_epoch).into()
217                            );
218                        }
219                        log_epochs
220                            .try_insert(upstream_table_id, epochs)
221                            .expect("non-duplicated");
222                    } else {
223                        return Err(anyhow!(
224                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
225                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
226                        );
227                    }
228                }
229            }
230        }
231        Ok((table_committed_epoch, log_epochs))
232    }
233
234    pub(super) async fn reload_runtime_info_impl(
235        &self,
236    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
237        {
238            {
239                {
240                    self.clean_dirty_streaming_jobs(None)
241                        .await
242                        .context("clean dirty streaming jobs")?;
243
244                    // Mview progress needs to be recovered.
245                    tracing::info!("recovering mview progress");
246                    let background_jobs = {
247                        let jobs = self
248                            .list_background_mv_progress()
249                            .await
250                            .context("recover mview progress should not fail")?;
251                        let mut background_jobs = HashMap::new();
252                        for (definition, stream_job_fragments) in jobs {
253                            if stream_job_fragments
254                                .tracking_progress_actor_ids()
255                                .is_empty()
256                            {
257                                // If there's no tracking actor in the mview, we can finish the job directly.
258                                self.metadata_manager
259                                    .catalog_controller
260                                    .finish_streaming_job(
261                                        stream_job_fragments.stream_job_id().table_id as _,
262                                        None,
263                                    )
264                                    .await?;
265                            } else {
266                                background_jobs
267                                    .try_insert(
268                                        stream_job_fragments.stream_job_id(),
269                                        (definition, stream_job_fragments),
270                                    )
271                                    .expect("non-duplicate");
272                            }
273                        }
274                        background_jobs
275                    };
276
277                    tracing::info!("recovered mview progress");
278
279                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
280                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
281
282                    let mut active_streaming_nodes =
283                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
284                            .await?;
285
286                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
287                    info!(
288                        "background streaming jobs: {:?} total {}",
289                        background_streaming_jobs,
290                        background_streaming_jobs.len()
291                    );
292
293                    let unreschedulable_jobs = {
294                        let mut unreschedulable_jobs = HashSet::new();
295
296                        for job_id in background_streaming_jobs {
297                            let scan_types = self
298                                .metadata_manager
299                                .get_job_backfill_scan_types(&job_id)
300                                .await?;
301
302                            if scan_types
303                                .values()
304                                .any(|scan_type| !scan_type.is_reschedulable())
305                            {
306                                unreschedulable_jobs.insert(job_id);
307                            }
308                        }
309
310                        unreschedulable_jobs
311                    };
312
313                    if !unreschedulable_jobs.is_empty() {
314                        tracing::info!(
315                            "unreschedulable background jobs: {:?}",
316                            unreschedulable_jobs
317                        );
318                    }
319
320                    // Resolve actor info for recovery. If there's no actor to recover, most of the
321                    // following steps will be no-op, while the compute nodes will still be reset.
322                    // FIXME: Transactions should be used.
323                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
324                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
325                        && unreschedulable_jobs.is_empty()
326                    {
327                        info!("trigger offline scaling");
328                        self.scale_actors(&active_streaming_nodes)
329                            .await
330                            .inspect_err(|err| {
331                                warn!(error = %err.as_report(), "scale actors failed");
332                            })?;
333
334                        self.resolve_graph_info(None).await.inspect_err(|err| {
335                            warn!(error = %err.as_report(), "resolve actor info failed");
336                        })?
337                    } else {
338                        info!("trigger actor migration");
339                        // Migrate actors in expired CN to newly joined one.
340                        self.migrate_actors(&mut active_streaming_nodes)
341                            .await
342                            .inspect_err(|err| {
343                                warn!(error = %err.as_report(), "migrate actors failed");
344                            })?
345                    };
346
347                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
348                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
349                            warn!(error = %err.as_report(), "resolve actor info failed");
350                        })?
351                    }
352
353                    let info = info;
354
355                    self.purge_state_table_from_hummock(
356                        &info
357                            .values()
358                            .flatten()
359                            .flat_map(|(_, job)| job.existing_table_ids())
360                            .collect(),
361                    )
362                    .await
363                    .context("purge state table from hummock")?;
364
365                    let (state_table_committed_epochs, state_table_log_epochs) = self
366                        .hummock_manager
367                        .on_current_version(|version| {
368                            Self::resolve_hummock_version_epochs(&background_jobs, version)
369                        })
370                        .await?;
371
372                    let subscription_infos = self
373                        .metadata_manager
374                        .get_mv_depended_subscriptions(None)
375                        .await?
376                        .into_iter()
377                        .map(|(database_id, mv_depended_subscriptions)| {
378                            (
379                                database_id,
380                                InflightSubscriptionInfo {
381                                    mv_depended_subscriptions,
382                                },
383                            )
384                        })
385                        .collect();
386
387                    // update and build all actors.
388                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
389                        warn!(error = %err.as_report(), "update actors failed");
390                    })?;
391
392                    let fragment_relations = self
393                        .metadata_manager
394                        .catalog_controller
395                        .get_fragment_downstream_relations(
396                            info.values()
397                                .flatten()
398                                .flat_map(|(_, job)| job.fragment_infos())
399                                .map(|fragment| fragment.fragment_id as _)
400                                .collect(),
401                        )
402                        .await?;
403
404                    let background_jobs = {
405                        let jobs = self
406                            .list_background_mv_progress()
407                            .await
408                            .context("recover mview progress should not fail")?;
409                        let mut background_jobs = HashMap::new();
410                        for (definition, stream_job_fragments) in jobs {
411                            background_jobs
412                                .try_insert(
413                                    stream_job_fragments.stream_job_id(),
414                                    (definition, stream_job_fragments),
415                                )
416                                .expect("non-duplicate");
417                        }
418                        background_jobs
419                    };
420
421                    // get split assignments for all actors
422                    let source_splits = self.source_manager.list_assignments().await;
423                    Ok(BarrierWorkerRuntimeInfoSnapshot {
424                        active_streaming_nodes,
425                        database_job_infos: info,
426                        state_table_committed_epochs,
427                        state_table_log_epochs,
428                        subscription_infos,
429                        stream_actors,
430                        fragment_relations,
431                        source_splits,
432                        background_jobs,
433                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
434                    })
435                }
436            }
437        }
438    }
439
440    pub(super) async fn reload_database_runtime_info_impl(
441        &self,
442        database_id: DatabaseId,
443    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
444        self.clean_dirty_streaming_jobs(Some(database_id))
445            .await
446            .context("clean dirty streaming jobs")?;
447
448        // Mview progress needs to be recovered.
449        tracing::info!(?database_id, "recovering mview progress of database");
450        let background_jobs = self
451            .list_background_mv_progress()
452            .await
453            .context("recover mview progress of database should not fail")?;
454        tracing::info!(?database_id, "recovered mview progress");
455
456        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
457        let _ = self
458            .scheduled_barriers
459            .pre_apply_drop_cancel(Some(database_id));
460
461        let info = self
462            .resolve_graph_info(Some(database_id))
463            .await
464            .inspect_err(|err| {
465                warn!(error = %err.as_report(), "resolve actor info failed");
466            })?;
467        assert!(info.len() <= 1);
468        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
469            assert_eq!(loaded_database_id, database_id);
470            info
471        }) else {
472            return Ok(None);
473        };
474
475        let background_jobs = {
476            let jobs = background_jobs;
477            let mut background_jobs = HashMap::new();
478            for (definition, stream_job_fragments) in jobs {
479                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
480                    continue;
481                }
482                if stream_job_fragments
483                    .tracking_progress_actor_ids()
484                    .is_empty()
485                {
486                    // If there's no tracking actor in the mview, we can finish the job directly.
487                    self.metadata_manager
488                        .catalog_controller
489                        .finish_streaming_job(
490                            stream_job_fragments.stream_job_id().table_id as _,
491                            None,
492                        )
493                        .await?;
494                } else {
495                    background_jobs
496                        .try_insert(
497                            stream_job_fragments.stream_job_id(),
498                            (definition, stream_job_fragments),
499                        )
500                        .expect("non-duplicate");
501                }
502            }
503            background_jobs
504        };
505
506        let (state_table_committed_epochs, state_table_log_epochs) = self
507            .hummock_manager
508            .on_current_version(|version| {
509                Self::resolve_hummock_version_epochs(&background_jobs, version)
510            })
511            .await?;
512
513        let subscription_infos = self
514            .metadata_manager
515            .get_mv_depended_subscriptions(Some(database_id))
516            .await?;
517        assert!(subscription_infos.len() <= 1);
518        let mv_depended_subscriptions = subscription_infos
519            .into_iter()
520            .next()
521            .map(|(loaded_database_id, subscriptions)| {
522                assert_eq!(loaded_database_id, database_id);
523                subscriptions
524            })
525            .unwrap_or_default();
526        let subscription_info = InflightSubscriptionInfo {
527            mv_depended_subscriptions,
528        };
529
530        let fragment_relations = self
531            .metadata_manager
532            .catalog_controller
533            .get_fragment_downstream_relations(
534                info.values()
535                    .flatten()
536                    .map(|fragment| fragment.fragment_id as _)
537                    .collect(),
538            )
539            .await?;
540
541        // update and build all actors.
542        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
543            warn!(error = %err.as_report(), "update actors failed");
544        })?;
545
546        // get split assignments for all actors
547        let source_splits = self.source_manager.list_assignments().await;
548        Ok(Some(DatabaseRuntimeInfoSnapshot {
549            job_infos: info,
550            state_table_committed_epochs,
551            state_table_log_epochs,
552            subscription_info,
553            stream_actors,
554            fragment_relations,
555            source_splits,
556            background_jobs,
557        }))
558    }
559}
560
561impl GlobalBarrierWorkerContextImpl {
562    // Migration timeout.
563    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
564
565    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
566    async fn migrate_actors(
567        &self,
568        active_nodes: &mut ActiveStreamingWorkerNodes,
569    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
570        let mgr = &self.metadata_manager;
571
572        // all worker slots used by actors
573        let all_inuse_worker_slots: HashSet<_> = mgr
574            .catalog_controller
575            .all_inuse_worker_slots()
576            .await?
577            .into_iter()
578            .collect();
579
580        let active_worker_slots: HashSet<_> = active_nodes
581            .current()
582            .values()
583            .flat_map(|node| {
584                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
585            })
586            .collect();
587
588        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
589            .difference(&active_worker_slots)
590            .cloned()
591            .collect();
592
593        if expired_worker_slots.is_empty() {
594            info!("no expired worker slots, skipping.");
595            return self.resolve_graph_info(None).await;
596        }
597
598        info!("start migrate actors.");
599        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
600        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
601
602        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
603            .intersection(&active_worker_slots)
604            .cloned()
605            .collect();
606
607        let start = Instant::now();
608        let mut plan = HashMap::new();
609        'discovery: while !to_migrate_worker_slots.is_empty() {
610            let mut new_worker_slots = active_nodes
611                .current()
612                .values()
613                .flat_map(|worker| {
614                    (0..worker.compute_node_parallelism())
615                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
616                })
617                .collect_vec();
618
619            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
620            let to_migration_size = to_migrate_worker_slots.len();
621            let mut available_size = new_worker_slots.len();
622
623            if available_size < to_migration_size
624                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
625            {
626                let mut factor = 2;
627
628                while available_size < to_migration_size {
629                    let mut extended_worker_slots = active_nodes
630                        .current()
631                        .values()
632                        .flat_map(|worker| {
633                            (0..worker.compute_node_parallelism() * factor)
634                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
635                        })
636                        .collect_vec();
637
638                    extended_worker_slots
639                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
640
641                    extended_worker_slots.sort_by(|a, b| {
642                        a.slot_idx()
643                            .cmp(&b.slot_idx())
644                            .then(a.worker_id().cmp(&b.worker_id()))
645                    });
646
647                    available_size = extended_worker_slots.len();
648                    new_worker_slots = extended_worker_slots;
649
650                    factor *= 2;
651                }
652
653                tracing::info!(
654                    "migration timed out, extending worker slots to {:?} by factor {}",
655                    new_worker_slots,
656                    factor,
657                );
658            }
659
660            if !new_worker_slots.is_empty() {
661                debug!("new worker slots found: {:#?}", new_worker_slots);
662                for target_worker_slot in new_worker_slots {
663                    if let Some(from) = to_migrate_worker_slots.pop() {
664                        debug!(
665                            "plan to migrate from worker slot {} to {}",
666                            from, target_worker_slot
667                        );
668                        inuse_worker_slots.insert(target_worker_slot);
669                        plan.insert(from, target_worker_slot);
670                    } else {
671                        break 'discovery;
672                    }
673                }
674            }
675
676            if to_migrate_worker_slots.is_empty() {
677                break;
678            }
679
680            // wait to get newly joined CN
681            let changed = active_nodes
682                .wait_changed(
683                    Duration::from_millis(5000),
684                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
685                    |active_nodes| {
686                        let current_nodes = active_nodes
687                            .current()
688                            .values()
689                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
690                            .collect_vec();
691                        warn!(
692                            current_nodes = ?current_nodes,
693                            "waiting for new workers to join, elapsed: {}s",
694                            start.elapsed().as_secs()
695                        );
696                    },
697                )
698                .await;
699            warn!(?changed, "get worker changed or timed out. Retry migrate");
700        }
701
702        info!("migration plan {:?}", plan);
703
704        mgr.catalog_controller.migrate_actors(plan).await?;
705
706        info!("migrate actors succeed.");
707
708        self.resolve_graph_info(None).await
709    }
710
711    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
712        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
713            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
714        };
715
716        match self.scale_controller.integrity_check().await {
717            Ok(_) => {
718                info!("integrity check passed");
719            }
720            Err(e) => {
721                return Err(anyhow!(e).context("integrity check failed").into());
722            }
723        }
724
725        let mgr = &self.metadata_manager;
726
727        debug!("start resetting actors distribution");
728
729        let available_workers: HashMap<_, _> = active_nodes
730            .current()
731            .values()
732            .filter(|worker| worker.is_streaming_schedulable())
733            .map(|worker| (worker.id, worker.clone()))
734            .collect();
735
736        info!(
737            "target worker ids for offline scaling: {:?}",
738            available_workers
739        );
740
741        let available_parallelism = active_nodes
742            .current()
743            .values()
744            .map(|worker_node| worker_node.compute_node_parallelism())
745            .sum();
746
747        let mut table_parallelisms = HashMap::new();
748
749        let reschedule_targets: HashMap<_, _> = {
750            let streaming_parallelisms = mgr
751                .catalog_controller
752                .get_all_streaming_parallelisms()
753                .await?;
754
755            let mut result = HashMap::new();
756
757            for (object_id, streaming_parallelism) in streaming_parallelisms {
758                let actual_fragment_parallelism = mgr
759                    .catalog_controller
760                    .get_actual_job_fragment_parallelism(object_id)
761                    .await?;
762
763                let table_parallelism = match streaming_parallelism {
764                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
765                    StreamingParallelism::Custom => model::TableParallelism::Custom,
766                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
767                };
768
769                let target_parallelism = Self::derive_target_parallelism(
770                    available_parallelism,
771                    table_parallelism,
772                    actual_fragment_parallelism,
773                    self.env.opts.default_parallelism,
774                );
775
776                if target_parallelism != table_parallelism {
777                    tracing::info!(
778                        "resetting table {} parallelism from {:?} to {:?}",
779                        object_id,
780                        table_parallelism,
781                        target_parallelism
782                    );
783                }
784
785                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
786
787                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
788
789                result.insert(
790                    object_id as u32,
791                    JobRescheduleTarget {
792                        parallelism: parallelism_change,
793                        resource_group: JobResourceGroupTarget::Keep,
794                    },
795                );
796            }
797
798            result
799        };
800
801        info!(
802            "target table parallelisms for offline scaling: {:?}",
803            reschedule_targets
804        );
805
806        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
807
808        for chunk in reschedule_targets
809            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
810            .map(|c| c.to_vec())
811        {
812            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
813
814            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
815
816            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
817
818            let plan = self
819                .scale_controller
820                .generate_job_reschedule_plan(JobReschedulePolicy {
821                    targets: local_reschedule_targets,
822                })
823                .await?;
824
825            // no need to update
826            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
827                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
828                continue;
829            };
830
831            let mut compared_table_parallelisms = table_parallelisms.clone();
832
833            // skip reschedule if no reschedule is generated.
834            let reschedule_fragment = if plan.reschedules.is_empty() {
835                HashMap::new()
836            } else {
837                self.scale_controller
838                    .analyze_reschedule_plan(
839                        plan.reschedules,
840                        RescheduleOptions {
841                            resolve_no_shuffle_upstream: true,
842                            skip_create_new_actors: true,
843                        },
844                        &mut compared_table_parallelisms,
845                    )
846                    .await?
847            };
848
849            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
850            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
851
852            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
853
854            if let Err(e) = self
855                .scale_controller
856                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
857                .await
858            {
859                tracing::error!(
860                    error = %e.as_report(),
861                    "failed to apply reschedule for offline scaling in recovery",
862                );
863
864                return Err(e);
865            }
866
867            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
868        }
869
870        info!("scaling actors succeed.");
871        Ok(())
872    }
873
874    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
875    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
876    // For Custom, we'll assess the parallelism of the core fragment;
877    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
878    // If it's lower, we'll set it to Fixed.
879    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
880    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
881    fn derive_target_parallelism(
882        available_parallelism: usize,
883        assigned_parallelism: TableParallelism,
884        actual_fragment_parallelism: Option<usize>,
885        default_parallelism: DefaultParallelism,
886    ) -> TableParallelism {
887        match assigned_parallelism {
888            TableParallelism::Custom => {
889                if let Some(fragment_parallelism) = actual_fragment_parallelism {
890                    if fragment_parallelism >= available_parallelism {
891                        TableParallelism::Adaptive
892                    } else {
893                        TableParallelism::Fixed(fragment_parallelism)
894                    }
895                } else {
896                    TableParallelism::Adaptive
897                }
898            }
899            TableParallelism::Adaptive => {
900                match (default_parallelism, actual_fragment_parallelism) {
901                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
902                        if fragment_parallelism == n.get() =>
903                    {
904                        TableParallelism::Fixed(fragment_parallelism)
905                    }
906                    _ => TableParallelism::Adaptive,
907                }
908            }
909            _ => assigned_parallelism,
910        }
911    }
912
913    /// Update all actors in compute nodes.
914    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
915        self.metadata_manager.all_active_actors().await
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use std::num::NonZeroUsize;
922
923    use super::*;
924    #[test]
925    fn test_derive_target_parallelism() {
926        // total 10, assigned custom, actual 5, default full -> fixed(5)
927        assert_eq!(
928            TableParallelism::Fixed(5),
929            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
930                10,
931                TableParallelism::Custom,
932                Some(5),
933                DefaultParallelism::Full,
934            )
935        );
936
937        // total 10, assigned custom, actual 10, default full -> adaptive
938        assert_eq!(
939            TableParallelism::Adaptive,
940            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
941                10,
942                TableParallelism::Custom,
943                Some(10),
944                DefaultParallelism::Full,
945            )
946        );
947
948        // total 10, assigned custom, actual 11, default full -> adaptive
949        assert_eq!(
950            TableParallelism::Adaptive,
951            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
952                10,
953                TableParallelism::Custom,
954                Some(11),
955                DefaultParallelism::Full,
956            )
957        );
958
959        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
960        assert_eq!(
961            TableParallelism::Adaptive,
962            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
963                10,
964                TableParallelism::Custom,
965                None,
966                DefaultParallelism::Full,
967            )
968        );
969
970        // total 10, assigned adaptive, actual _, default full -> adaptive
971        assert_eq!(
972            TableParallelism::Adaptive,
973            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
974                10,
975                TableParallelism::Adaptive,
976                None,
977                DefaultParallelism::Full,
978            )
979        );
980
981        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
982        assert_eq!(
983            TableParallelism::Fixed(5),
984            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
985                10,
986                TableParallelism::Adaptive,
987                Some(5),
988                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
989            )
990        );
991
992        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
993        assert_eq!(
994            TableParallelism::Adaptive,
995            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
996                10,
997                TableParallelism::Adaptive,
998                Some(6),
999                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1000            )
1001        );
1002    }
1003}