risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use futures::future::try_join_all;
20use itertools::Itertools;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_meta_model::StreamingParallelism;
25use thiserror_ext::AsReport;
26use tokio::time::Instant;
27use tracing::{debug, info, warn};
28
29use super::BarrierWorkerRuntimeInfoSnapshot;
30use crate::barrier::context::GlobalBarrierWorkerContextImpl;
31use crate::barrier::info::InflightDatabaseInfo;
32use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
33use crate::controller::fragment::InflightFragmentInfo;
34use crate::manager::ActiveStreamingWorkerNodes;
35use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
36use crate::stream::{
37    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
38    RescheduleOptions, SourceChange,
39};
40use crate::{MetaResult, model};
41
42impl GlobalBarrierWorkerContextImpl {
43    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
44    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
45        let database_id = database_id.map(|database_id| database_id.database_id as _);
46        self.metadata_manager
47            .catalog_controller
48            .clean_dirty_subscription(database_id)
49            .await?;
50        let dirty_associated_source_ids = self
51            .metadata_manager
52            .catalog_controller
53            .clean_dirty_creating_jobs(database_id)
54            .await?;
55
56        // unregister cleaned sources.
57        self.source_manager
58            .apply_source_change(SourceChange::DropSource {
59                dropped_source_ids: dirty_associated_source_ids,
60            })
61            .await;
62
63        Ok(())
64    }
65
66    async fn purge_state_table_from_hummock(
67        &self,
68        all_state_table_ids: &HashSet<TableId>,
69    ) -> MetaResult<()> {
70        self.hummock_manager.purge(all_state_table_ids).await?;
71        Ok(())
72    }
73
74    async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
75        let mgr = &self.metadata_manager;
76        let mviews = mgr
77            .catalog_controller
78            .list_background_creating_mviews(false)
79            .await?;
80
81        try_join_all(mviews.into_iter().map(|mview| async move {
82            let table_id = TableId::new(mview.table_id as _);
83            let stream_job_fragments = mgr
84                .catalog_controller
85                .get_job_fragments_by_id(mview.table_id)
86                .await?;
87            assert_eq!(stream_job_fragments.stream_job_id(), table_id);
88            Ok((mview.definition, stream_job_fragments))
89        }))
90        .await
91        // If failed, enter recovery mode.
92    }
93
94    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
95    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
96    /// will create or drop before this barrier flow through them.
97    async fn resolve_graph_info(
98        &self,
99        database_id: Option<DatabaseId>,
100    ) -> MetaResult<HashMap<DatabaseId, InflightDatabaseInfo>> {
101        let database_id = database_id.map(|database_id| database_id.database_id as _);
102        let all_actor_infos = self
103            .metadata_manager
104            .catalog_controller
105            .load_all_actors(database_id)
106            .await?;
107
108        Ok(all_actor_infos
109            .into_iter()
110            .map(|(loaded_database_id, actor_infos)| {
111                if let Some(database_id) = database_id {
112                    assert_eq!(database_id, loaded_database_id);
113                }
114                (
115                    DatabaseId::new(loaded_database_id as _),
116                    InflightDatabaseInfo::new(actor_infos.into_iter().map(
117                        |(job_id, actor_infos)| {
118                            (
119                                TableId::new(job_id as _),
120                                actor_infos
121                                    .into_iter()
122                                    .map(|(fragment_id, info)| (fragment_id as _, info)),
123                            )
124                        },
125                    )),
126                )
127            })
128            .collect())
129    }
130
131    pub(super) async fn reload_runtime_info_impl(
132        &self,
133    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
134        {
135            {
136                {
137                    self.clean_dirty_streaming_jobs(None)
138                        .await
139                        .context("clean dirty streaming jobs")?;
140
141                    // Mview progress needs to be recovered.
142                    tracing::info!("recovering mview progress");
143                    let background_jobs = {
144                        let jobs = self
145                            .list_background_mv_progress()
146                            .await
147                            .context("recover mview progress should not fail")?;
148                        let mut background_jobs = HashMap::new();
149                        for (definition, stream_job_fragments) in jobs {
150                            if stream_job_fragments
151                                .tracking_progress_actor_ids()
152                                .is_empty()
153                            {
154                                // If there's no tracking actor in the mview, we can finish the job directly.
155                                self.metadata_manager
156                                    .catalog_controller
157                                    .finish_streaming_job(
158                                        stream_job_fragments.stream_job_id().table_id as _,
159                                        None,
160                                    )
161                                    .await?;
162                            } else {
163                                background_jobs
164                                    .try_insert(
165                                        stream_job_fragments.stream_job_id(),
166                                        (definition, stream_job_fragments),
167                                    )
168                                    .expect("non-duplicate");
169                            }
170                        }
171                        background_jobs
172                    };
173
174                    tracing::info!("recovered mview progress");
175
176                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
177                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
178
179                    let mut active_streaming_nodes =
180                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
181                            .await?;
182
183                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
184                    info!(
185                        "background streaming jobs: {:?} total {}",
186                        background_streaming_jobs,
187                        background_streaming_jobs.len()
188                    );
189
190                    let unreschedulable_jobs = {
191                        let mut unreschedulable_jobs = HashSet::new();
192
193                        for job_id in background_streaming_jobs {
194                            let scan_types = self
195                                .metadata_manager
196                                .get_job_backfill_scan_types(&job_id)
197                                .await?;
198
199                            if scan_types
200                                .values()
201                                .any(|scan_type| !scan_type.is_reschedulable())
202                            {
203                                unreschedulable_jobs.insert(job_id);
204                            }
205                        }
206
207                        unreschedulable_jobs
208                    };
209
210                    if !unreschedulable_jobs.is_empty() {
211                        tracing::info!(
212                            "unreschedulable background jobs: {:?}",
213                            unreschedulable_jobs
214                        );
215                    }
216
217                    // Resolve actor info for recovery. If there's no actor to recover, most of the
218                    // following steps will be no-op, while the compute nodes will still be reset.
219                    // FIXME: Transactions should be used.
220                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
221                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
222                        && unreschedulable_jobs.is_empty()
223                    {
224                        info!("trigger offline scaling");
225                        self.scale_actors(&active_streaming_nodes)
226                            .await
227                            .inspect_err(|err| {
228                                warn!(error = %err.as_report(), "scale actors failed");
229                            })?;
230
231                        self.resolve_graph_info(None).await.inspect_err(|err| {
232                            warn!(error = %err.as_report(), "resolve actor info failed");
233                        })?
234                    } else {
235                        info!("trigger actor migration");
236                        // Migrate actors in expired CN to newly joined one.
237                        self.migrate_actors(&mut active_streaming_nodes)
238                            .await
239                            .inspect_err(|err| {
240                                warn!(error = %err.as_report(), "migrate actors failed");
241                            })?
242                    };
243
244                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
245                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
246                            warn!(error = %err.as_report(), "resolve actor info failed");
247                        })?
248                    }
249
250                    let info = info;
251
252                    self.purge_state_table_from_hummock(
253                        &InflightFragmentInfo::existing_table_ids(
254                            info.values().flat_map(|database| database.fragment_infos()),
255                        )
256                        .collect(),
257                    )
258                    .await
259                    .context("purge state table from hummock")?;
260
261                    let state_table_committed_epochs: HashMap<_, _> = self
262                        .hummock_manager
263                        .on_current_version(|version| {
264                            version
265                                .state_table_info
266                                .info()
267                                .iter()
268                                .map(|(table_id, info)| (*table_id, info.committed_epoch))
269                                .collect()
270                        })
271                        .await;
272
273                    let subscription_infos = self
274                        .metadata_manager
275                        .get_mv_depended_subscriptions(None)
276                        .await?
277                        .into_iter()
278                        .map(|(database_id, mv_depended_subscriptions)| {
279                            (
280                                database_id,
281                                InflightSubscriptionInfo {
282                                    mv_depended_subscriptions,
283                                },
284                            )
285                        })
286                        .collect();
287
288                    // update and build all actors.
289                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
290                        warn!(error = %err.as_report(), "update actors failed");
291                    })?;
292
293                    let fragment_relations = self
294                        .metadata_manager
295                        .catalog_controller
296                        .get_fragment_downstream_relations(
297                            info.values()
298                                .flat_map(|database| database.fragment_infos())
299                                .map(|fragment| fragment.fragment_id as _)
300                                .collect(),
301                        )
302                        .await?;
303
304                    let background_jobs = {
305                        let jobs = self
306                            .list_background_mv_progress()
307                            .await
308                            .context("recover mview progress should not fail")?;
309                        let mut background_jobs = HashMap::new();
310                        for (definition, stream_job_fragments) in jobs {
311                            background_jobs
312                                .try_insert(
313                                    stream_job_fragments.stream_job_id(),
314                                    (definition, stream_job_fragments),
315                                )
316                                .expect("non-duplicate");
317                        }
318                        background_jobs
319                    };
320
321                    // get split assignments for all actors
322                    let source_splits = self.source_manager.list_assignments().await;
323                    Ok(BarrierWorkerRuntimeInfoSnapshot {
324                        active_streaming_nodes,
325                        database_fragment_infos: info,
326                        state_table_committed_epochs,
327                        subscription_infos,
328                        stream_actors,
329                        fragment_relations,
330                        source_splits,
331                        background_jobs,
332                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
333                    })
334                }
335            }
336        }
337    }
338
339    pub(super) async fn reload_database_runtime_info_impl(
340        &self,
341        database_id: DatabaseId,
342    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
343        self.clean_dirty_streaming_jobs(Some(database_id))
344            .await
345            .context("clean dirty streaming jobs")?;
346
347        // Mview progress needs to be recovered.
348        tracing::info!(?database_id, "recovering mview progress of database");
349        let background_jobs = self
350            .list_background_mv_progress()
351            .await
352            .context("recover mview progress of database should not fail")?;
353        tracing::info!(?database_id, "recovered mview progress");
354
355        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
356        let _ = self
357            .scheduled_barriers
358            .pre_apply_drop_cancel(Some(database_id));
359
360        let info = self
361            .resolve_graph_info(Some(database_id))
362            .await
363            .inspect_err(|err| {
364                warn!(error = %err.as_report(), "resolve actor info failed");
365            })?;
366        assert!(info.len() <= 1);
367        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
368            assert_eq!(loaded_database_id, database_id);
369            info
370        }) else {
371            return Ok(None);
372        };
373
374        let background_jobs = {
375            let jobs = background_jobs;
376            let mut background_jobs = HashMap::new();
377            for (definition, stream_job_fragments) in jobs {
378                if !info.contains_job(stream_job_fragments.stream_job_id()) {
379                    continue;
380                }
381                if stream_job_fragments
382                    .tracking_progress_actor_ids()
383                    .is_empty()
384                {
385                    // If there's no tracking actor in the mview, we can finish the job directly.
386                    self.metadata_manager
387                        .catalog_controller
388                        .finish_streaming_job(
389                            stream_job_fragments.stream_job_id().table_id as _,
390                            None,
391                        )
392                        .await?;
393                } else {
394                    background_jobs
395                        .try_insert(
396                            stream_job_fragments.stream_job_id(),
397                            (definition, stream_job_fragments),
398                        )
399                        .expect("non-duplicate");
400                }
401            }
402            background_jobs
403        };
404
405        let state_table_committed_epochs: HashMap<_, _> = self
406            .hummock_manager
407            .on_current_version(|version| {
408                version
409                    .state_table_info
410                    .info()
411                    .iter()
412                    .map(|(table_id, info)| (*table_id, info.committed_epoch))
413                    .collect()
414            })
415            .await;
416
417        let subscription_infos = self
418            .metadata_manager
419            .get_mv_depended_subscriptions(Some(database_id))
420            .await?;
421        assert!(subscription_infos.len() <= 1);
422        let mv_depended_subscriptions = subscription_infos
423            .into_iter()
424            .next()
425            .map(|(loaded_database_id, subscriptions)| {
426                assert_eq!(loaded_database_id, database_id);
427                subscriptions
428            })
429            .unwrap_or_default();
430        let subscription_info = InflightSubscriptionInfo {
431            mv_depended_subscriptions,
432        };
433
434        let fragment_relations = self
435            .metadata_manager
436            .catalog_controller
437            .get_fragment_downstream_relations(
438                info.fragment_infos()
439                    .map(|fragment| fragment.fragment_id as _)
440                    .collect(),
441            )
442            .await?;
443
444        // update and build all actors.
445        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
446            warn!(error = %err.as_report(), "update actors failed");
447        })?;
448
449        // get split assignments for all actors
450        let source_splits = self.source_manager.list_assignments().await;
451        Ok(Some(DatabaseRuntimeInfoSnapshot {
452            database_fragment_info: info,
453            state_table_committed_epochs,
454            subscription_info,
455            stream_actors,
456            fragment_relations,
457            source_splits,
458            background_jobs,
459        }))
460    }
461}
462
463impl GlobalBarrierWorkerContextImpl {
464    // Migration timeout.
465    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
466
467    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
468    async fn migrate_actors(
469        &self,
470        active_nodes: &mut ActiveStreamingWorkerNodes,
471    ) -> MetaResult<HashMap<DatabaseId, InflightDatabaseInfo>> {
472        let mgr = &self.metadata_manager;
473
474        // all worker slots used by actors
475        let all_inuse_worker_slots: HashSet<_> = mgr
476            .catalog_controller
477            .all_inuse_worker_slots()
478            .await?
479            .into_iter()
480            .collect();
481
482        let active_worker_slots: HashSet<_> = active_nodes
483            .current()
484            .values()
485            .flat_map(|node| {
486                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
487            })
488            .collect();
489
490        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
491            .difference(&active_worker_slots)
492            .cloned()
493            .collect();
494
495        if expired_worker_slots.is_empty() {
496            info!("no expired worker slots, skipping.");
497            return self.resolve_graph_info(None).await;
498        }
499
500        info!("start migrate actors.");
501        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
502        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
503
504        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
505            .intersection(&active_worker_slots)
506            .cloned()
507            .collect();
508
509        let start = Instant::now();
510        let mut plan = HashMap::new();
511        'discovery: while !to_migrate_worker_slots.is_empty() {
512            let mut new_worker_slots = active_nodes
513                .current()
514                .values()
515                .flat_map(|worker| {
516                    (0..worker.compute_node_parallelism())
517                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
518                })
519                .collect_vec();
520
521            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
522            let to_migration_size = to_migrate_worker_slots.len();
523            let mut available_size = new_worker_slots.len();
524
525            if available_size < to_migration_size
526                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
527            {
528                let mut factor = 2;
529
530                while available_size < to_migration_size {
531                    let mut extended_worker_slots = active_nodes
532                        .current()
533                        .values()
534                        .flat_map(|worker| {
535                            (0..worker.compute_node_parallelism() * factor)
536                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
537                        })
538                        .collect_vec();
539
540                    extended_worker_slots
541                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
542
543                    extended_worker_slots.sort_by(|a, b| {
544                        a.slot_idx()
545                            .cmp(&b.slot_idx())
546                            .then(a.worker_id().cmp(&b.worker_id()))
547                    });
548
549                    available_size = extended_worker_slots.len();
550                    new_worker_slots = extended_worker_slots;
551
552                    factor *= 2;
553                }
554
555                tracing::info!(
556                    "migration timed out, extending worker slots to {:?} by factor {}",
557                    new_worker_slots,
558                    factor,
559                );
560            }
561
562            if !new_worker_slots.is_empty() {
563                debug!("new worker slots found: {:#?}", new_worker_slots);
564                for target_worker_slot in new_worker_slots {
565                    if let Some(from) = to_migrate_worker_slots.pop() {
566                        debug!(
567                            "plan to migrate from worker slot {} to {}",
568                            from, target_worker_slot
569                        );
570                        inuse_worker_slots.insert(target_worker_slot);
571                        plan.insert(from, target_worker_slot);
572                    } else {
573                        break 'discovery;
574                    }
575                }
576            }
577
578            if to_migrate_worker_slots.is_empty() {
579                break;
580            }
581
582            // wait to get newly joined CN
583            let changed = active_nodes
584                .wait_changed(
585                    Duration::from_millis(5000),
586                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
587                    |active_nodes| {
588                        let current_nodes = active_nodes
589                            .current()
590                            .values()
591                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
592                            .collect_vec();
593                        warn!(
594                            current_nodes = ?current_nodes,
595                            "waiting for new workers to join, elapsed: {}s",
596                            start.elapsed().as_secs()
597                        );
598                    },
599                )
600                .await;
601            warn!(?changed, "get worker changed or timed out. Retry migrate");
602        }
603
604        info!("migration plan {:?}", plan);
605
606        mgr.catalog_controller.migrate_actors(plan).await?;
607
608        info!("migrate actors succeed.");
609
610        self.resolve_graph_info(None).await
611    }
612
613    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
614        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
615            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
616        };
617
618        match self.scale_controller.integrity_check().await {
619            Ok(_) => {
620                info!("integrity check passed");
621            }
622            Err(e) => {
623                return Err(anyhow!(e).context("integrity check failed").into());
624            }
625        }
626
627        let mgr = &self.metadata_manager;
628
629        debug!("start resetting actors distribution");
630
631        let available_workers: HashMap<_, _> = active_nodes
632            .current()
633            .values()
634            .filter(|worker| worker.is_streaming_schedulable())
635            .map(|worker| (worker.id, worker.clone()))
636            .collect();
637
638        info!(
639            "target worker ids for offline scaling: {:?}",
640            available_workers
641        );
642
643        let available_parallelism = active_nodes
644            .current()
645            .values()
646            .map(|worker_node| worker_node.compute_node_parallelism())
647            .sum();
648
649        let mut table_parallelisms = HashMap::new();
650
651        let reschedule_targets: HashMap<_, _> = {
652            let streaming_parallelisms = mgr
653                .catalog_controller
654                .get_all_streaming_parallelisms()
655                .await?;
656
657            let mut result = HashMap::new();
658
659            for (object_id, streaming_parallelism) in streaming_parallelisms {
660                let actual_fragment_parallelism = mgr
661                    .catalog_controller
662                    .get_actual_job_fragment_parallelism(object_id)
663                    .await?;
664
665                let table_parallelism = match streaming_parallelism {
666                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
667                    StreamingParallelism::Custom => model::TableParallelism::Custom,
668                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
669                };
670
671                let target_parallelism = Self::derive_target_parallelism(
672                    available_parallelism,
673                    table_parallelism,
674                    actual_fragment_parallelism,
675                    self.env.opts.default_parallelism,
676                );
677
678                if target_parallelism != table_parallelism {
679                    tracing::info!(
680                        "resetting table {} parallelism from {:?} to {:?}",
681                        object_id,
682                        table_parallelism,
683                        target_parallelism
684                    );
685                }
686
687                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
688
689                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
690
691                result.insert(
692                    object_id as u32,
693                    JobRescheduleTarget {
694                        parallelism: parallelism_change,
695                        resource_group: JobResourceGroupTarget::Keep,
696                    },
697                );
698            }
699
700            result
701        };
702
703        info!(
704            "target table parallelisms for offline scaling: {:?}",
705            reschedule_targets
706        );
707
708        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
709
710        for chunk in reschedule_targets
711            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
712            .map(|c| c.to_vec())
713        {
714            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
715
716            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
717
718            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
719
720            let plan = self
721                .scale_controller
722                .generate_job_reschedule_plan(JobReschedulePolicy {
723                    targets: local_reschedule_targets,
724                })
725                .await?;
726
727            // no need to update
728            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
729                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
730                continue;
731            };
732
733            let mut compared_table_parallelisms = table_parallelisms.clone();
734
735            // skip reschedule if no reschedule is generated.
736            let reschedule_fragment = if plan.reschedules.is_empty() {
737                HashMap::new()
738            } else {
739                self.scale_controller
740                    .analyze_reschedule_plan(
741                        plan.reschedules,
742                        RescheduleOptions {
743                            resolve_no_shuffle_upstream: true,
744                            skip_create_new_actors: true,
745                        },
746                        &mut compared_table_parallelisms,
747                    )
748                    .await?
749            };
750
751            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
752            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
753
754            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
755
756            if let Err(e) = self
757                .scale_controller
758                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
759                .await
760            {
761                tracing::error!(
762                    error = %e.as_report(),
763                    "failed to apply reschedule for offline scaling in recovery",
764                );
765
766                return Err(e);
767            }
768
769            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
770        }
771
772        info!("scaling actors succeed.");
773        Ok(())
774    }
775
776    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
777    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
778    // For Custom, we'll assess the parallelism of the core fragment;
779    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
780    // If it's lower, we'll set it to Fixed.
781    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
782    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
783    fn derive_target_parallelism(
784        available_parallelism: usize,
785        assigned_parallelism: TableParallelism,
786        actual_fragment_parallelism: Option<usize>,
787        default_parallelism: DefaultParallelism,
788    ) -> TableParallelism {
789        match assigned_parallelism {
790            TableParallelism::Custom => {
791                if let Some(fragment_parallelism) = actual_fragment_parallelism {
792                    if fragment_parallelism >= available_parallelism {
793                        TableParallelism::Adaptive
794                    } else {
795                        TableParallelism::Fixed(fragment_parallelism)
796                    }
797                } else {
798                    TableParallelism::Adaptive
799                }
800            }
801            TableParallelism::Adaptive => {
802                match (default_parallelism, actual_fragment_parallelism) {
803                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
804                        if fragment_parallelism == n.get() =>
805                    {
806                        TableParallelism::Fixed(fragment_parallelism)
807                    }
808                    _ => TableParallelism::Adaptive,
809                }
810            }
811            _ => assigned_parallelism,
812        }
813    }
814
815    /// Update all actors in compute nodes.
816    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
817        self.metadata_manager.all_active_actors().await
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use std::num::NonZeroUsize;
824
825    use super::*;
826    #[test]
827    fn test_derive_target_parallelism() {
828        // total 10, assigned custom, actual 5, default full -> fixed(5)
829        assert_eq!(
830            TableParallelism::Fixed(5),
831            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
832                10,
833                TableParallelism::Custom,
834                Some(5),
835                DefaultParallelism::Full,
836            )
837        );
838
839        // total 10, assigned custom, actual 10, default full -> adaptive
840        assert_eq!(
841            TableParallelism::Adaptive,
842            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
843                10,
844                TableParallelism::Custom,
845                Some(10),
846                DefaultParallelism::Full,
847            )
848        );
849
850        // total 10, assigned custom, actual 11, default full -> adaptive
851        assert_eq!(
852            TableParallelism::Adaptive,
853            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
854                10,
855                TableParallelism::Custom,
856                Some(11),
857                DefaultParallelism::Full,
858            )
859        );
860
861        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
862        assert_eq!(
863            TableParallelism::Adaptive,
864            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
865                10,
866                TableParallelism::Custom,
867                None,
868                DefaultParallelism::Full,
869            )
870        );
871
872        // total 10, assigned adaptive, actual _, default full -> adaptive
873        assert_eq!(
874            TableParallelism::Adaptive,
875            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
876                10,
877                TableParallelism::Adaptive,
878                None,
879                DefaultParallelism::Full,
880            )
881        );
882
883        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
884        assert_eq!(
885            TableParallelism::Fixed(5),
886            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
887                10,
888                TableParallelism::Adaptive,
889                Some(5),
890                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
891            )
892        );
893
894        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
895        assert_eq!(
896            TableParallelism::Adaptive,
897            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
898                10,
899                TableParallelism::Adaptive,
900                Some(6),
901                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
902            )
903        );
904    }
905}