risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::{
39    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
40    RescheduleOptions, SourceChange, StreamFragmentGraph,
41};
42use crate::{MetaResult, model};
43
44impl GlobalBarrierWorkerContextImpl {
45    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
46    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47        let database_id = database_id.map(|database_id| database_id.database_id as _);
48        self.metadata_manager
49            .catalog_controller
50            .clean_dirty_subscription(database_id)
51            .await?;
52        let dirty_associated_source_ids = self
53            .metadata_manager
54            .catalog_controller
55            .clean_dirty_creating_jobs(database_id)
56            .await?;
57
58        // unregister cleaned sources.
59        self.source_manager
60            .apply_source_change(SourceChange::DropSource {
61                dropped_source_ids: dirty_associated_source_ids,
62            })
63            .await;
64
65        Ok(())
66    }
67
68    async fn purge_state_table_from_hummock(
69        &self,
70        all_state_table_ids: &HashSet<TableId>,
71    ) -> MetaResult<()> {
72        self.hummock_manager.purge(all_state_table_ids).await?;
73        Ok(())
74    }
75
76    async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
77        let mgr = &self.metadata_manager;
78        let mviews = mgr
79            .catalog_controller
80            .list_background_creating_mviews(false)
81            .await?;
82
83        try_join_all(mviews.into_iter().map(|mview| async move {
84            let table_id = TableId::new(mview.table_id as _);
85            let stream_job_fragments = mgr
86                .catalog_controller
87                .get_job_fragments_by_id(mview.table_id)
88                .await?;
89            assert_eq!(stream_job_fragments.stream_job_id(), table_id);
90            Ok((mview.definition, stream_job_fragments))
91        }))
92        .await
93        // If failed, enter recovery mode.
94    }
95
96    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
97    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
98    /// will create or drop before this barrier flow through them.
99    async fn resolve_graph_info(
100        &self,
101        database_id: Option<DatabaseId>,
102    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
103        let database_id = database_id.map(|database_id| database_id.database_id as _);
104        let all_actor_infos = self
105            .metadata_manager
106            .catalog_controller
107            .load_all_actors(database_id)
108            .await?;
109
110        Ok(all_actor_infos
111            .into_iter()
112            .map(|(loaded_database_id, job_fragment_infos)| {
113                if let Some(database_id) = database_id {
114                    assert_eq!(database_id, loaded_database_id);
115                }
116                (
117                    DatabaseId::new(loaded_database_id as _),
118                    job_fragment_infos
119                        .into_iter()
120                        .map(|(job_id, fragment_infos)| {
121                            let job_id = TableId::new(job_id as _);
122                            (
123                                job_id,
124                                InflightStreamingJobInfo {
125                                    job_id,
126                                    fragment_infos: fragment_infos
127                                        .into_iter()
128                                        .map(|(fragment_id, info)| (fragment_id as _, info))
129                                        .collect(),
130                                },
131                            )
132                        })
133                        .collect(),
134                )
135            })
136            .collect())
137    }
138
139    #[expect(clippy::type_complexity)]
140    fn resolve_hummock_version_epochs(
141        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
142        version: &HummockVersion,
143    ) -> MetaResult<(
144        HashMap<TableId, u64>,
145        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
146    )> {
147        let table_committed_epoch: HashMap<_, _> = version
148            .state_table_info
149            .info()
150            .iter()
151            .map(|(table_id, info)| (*table_id, info.committed_epoch))
152            .collect();
153        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
154            Ok(*table_committed_epoch
155                .get(&table_id)
156                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
157        };
158        let mut min_downstream_committed_epochs = HashMap::new();
159        for (_, job) in background_jobs.values() {
160            let job_committed_epoch = get_table_committed_epoch(job.stream_job_id)?;
161            if let (Some(snapshot_backfill_info), _) =
162                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
163                    job.fragments()
164                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
165                )?
166            {
167                for (upstream_table, snapshot_epoch) in
168                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
169                {
170                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
171                        anyhow!(
172                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
173                            job
174                        )
175                    })?;
176                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
177                    match min_downstream_committed_epochs.entry(upstream_table) {
178                        Entry::Occupied(entry) => {
179                            let prev_min_epoch = entry.into_mut();
180                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
181                        }
182                        Entry::Vacant(entry) => {
183                            entry.insert(pinned_epoch);
184                        }
185                    }
186                }
187            }
188        }
189        let mut log_epochs = HashMap::new();
190        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
191            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
192            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
193                Ordering::Less => {
194                    return Err(anyhow!(
195                        "downstream epoch {} later than upstream epoch {} of table {}",
196                        downstream_committed_epoch,
197                        upstream_committed_epoch,
198                        upstream_table_id
199                    )
200                    .into());
201                }
202                Ordering::Equal => {
203                    continue;
204                }
205                Ordering::Greater => {
206                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
207                    {
208                        let epochs = table_change_log
209                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
210                            .map(|epoch_log| {
211                                (
212                                    epoch_log.non_checkpoint_epochs.clone(),
213                                    epoch_log.checkpoint_epoch,
214                                )
215                            })
216                            .collect_vec();
217                        let first_epochs = epochs.first();
218                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
219                            && *first_checkpoint_epoch == downstream_committed_epoch
220                        {
221                        } else {
222                            return Err(anyhow!(
223                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
224                                epochs, upstream_table_id, downstream_committed_epoch).into()
225                            );
226                        }
227                        log_epochs
228                            .try_insert(upstream_table_id, epochs)
229                            .expect("non-duplicated");
230                    } else {
231                        return Err(anyhow!(
232                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
233                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
234                        );
235                    }
236                }
237            }
238        }
239        Ok((table_committed_epoch, log_epochs))
240    }
241
242    pub(super) async fn reload_runtime_info_impl(
243        &self,
244    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
245        {
246            {
247                {
248                    self.clean_dirty_streaming_jobs(None)
249                        .await
250                        .context("clean dirty streaming jobs")?;
251
252                    // Mview progress needs to be recovered.
253                    tracing::info!("recovering mview progress");
254                    let background_jobs = {
255                        let jobs = self
256                            .list_background_mv_progress()
257                            .await
258                            .context("recover mview progress should not fail")?;
259                        let mut background_jobs = HashMap::new();
260                        for (definition, stream_job_fragments) in jobs {
261                            if stream_job_fragments
262                                .tracking_progress_actor_ids()
263                                .is_empty()
264                            {
265                                // If there's no tracking actor in the mview, we can finish the job directly.
266                                self.metadata_manager
267                                    .catalog_controller
268                                    .finish_streaming_job(
269                                        stream_job_fragments.stream_job_id().table_id as _,
270                                        None,
271                                    )
272                                    .await?;
273                            } else {
274                                background_jobs
275                                    .try_insert(
276                                        stream_job_fragments.stream_job_id(),
277                                        (definition, stream_job_fragments),
278                                    )
279                                    .expect("non-duplicate");
280                            }
281                        }
282                        background_jobs
283                    };
284
285                    tracing::info!("recovered mview progress");
286
287                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
288                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
289
290                    let mut active_streaming_nodes =
291                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
292                            .await?;
293
294                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
295                    info!(
296                        "background streaming jobs: {:?} total {}",
297                        background_streaming_jobs,
298                        background_streaming_jobs.len()
299                    );
300
301                    let unreschedulable_jobs = {
302                        let mut unreschedulable_jobs = HashSet::new();
303
304                        for job_id in background_streaming_jobs {
305                            let scan_types = self
306                                .metadata_manager
307                                .get_job_backfill_scan_types(&job_id)
308                                .await?;
309
310                            if scan_types
311                                .values()
312                                .any(|scan_type| !scan_type.is_reschedulable())
313                            {
314                                unreschedulable_jobs.insert(job_id);
315                            }
316                        }
317
318                        unreschedulable_jobs
319                    };
320
321                    if !unreschedulable_jobs.is_empty() {
322                        tracing::info!(
323                            "unreschedulable background jobs: {:?}",
324                            unreschedulable_jobs
325                        );
326                    }
327
328                    // Resolve actor info for recovery. If there's no actor to recover, most of the
329                    // following steps will be no-op, while the compute nodes will still be reset.
330                    // FIXME: Transactions should be used.
331                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
332                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
333                        && unreschedulable_jobs.is_empty()
334                    {
335                        info!("trigger offline scaling");
336                        self.scale_actors(&active_streaming_nodes)
337                            .await
338                            .inspect_err(|err| {
339                                warn!(error = %err.as_report(), "scale actors failed");
340                            })?;
341
342                        self.resolve_graph_info(None).await.inspect_err(|err| {
343                            warn!(error = %err.as_report(), "resolve actor info failed");
344                        })?
345                    } else {
346                        info!("trigger actor migration");
347                        // Migrate actors in expired CN to newly joined one.
348                        self.migrate_actors(&mut active_streaming_nodes)
349                            .await
350                            .inspect_err(|err| {
351                                warn!(error = %err.as_report(), "migrate actors failed");
352                            })?
353                    };
354
355                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
356                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
357                            warn!(error = %err.as_report(), "resolve actor info failed");
358                        })?
359                    }
360
361                    let info = info;
362
363                    self.purge_state_table_from_hummock(
364                        &info
365                            .values()
366                            .flatten()
367                            .flat_map(|(_, job)| job.existing_table_ids())
368                            .collect(),
369                    )
370                    .await
371                    .context("purge state table from hummock")?;
372
373                    let (state_table_committed_epochs, state_table_log_epochs) = self
374                        .hummock_manager
375                        .on_current_version(|version| {
376                            Self::resolve_hummock_version_epochs(&background_jobs, version)
377                        })
378                        .await?;
379
380                    let subscription_infos = self
381                        .metadata_manager
382                        .get_mv_depended_subscriptions(None)
383                        .await?
384                        .into_iter()
385                        .map(|(database_id, mv_depended_subscriptions)| {
386                            (
387                                database_id,
388                                InflightSubscriptionInfo {
389                                    mv_depended_subscriptions,
390                                },
391                            )
392                        })
393                        .collect();
394
395                    // update and build all actors.
396                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
397                        warn!(error = %err.as_report(), "update actors failed");
398                    })?;
399
400                    let fragment_relations = self
401                        .metadata_manager
402                        .catalog_controller
403                        .get_fragment_downstream_relations(
404                            info.values()
405                                .flatten()
406                                .flat_map(|(_, job)| job.fragment_infos())
407                                .map(|fragment| fragment.fragment_id as _)
408                                .collect(),
409                        )
410                        .await?;
411
412                    let background_jobs = {
413                        let jobs = self
414                            .list_background_mv_progress()
415                            .await
416                            .context("recover mview progress should not fail")?;
417                        let mut background_jobs = HashMap::new();
418                        for (definition, stream_job_fragments) in jobs {
419                            background_jobs
420                                .try_insert(
421                                    stream_job_fragments.stream_job_id(),
422                                    (definition, stream_job_fragments),
423                                )
424                                .expect("non-duplicate");
425                        }
426                        background_jobs
427                    };
428
429                    let database_infos = self
430                        .metadata_manager
431                        .catalog_controller
432                        .list_databases()
433                        .await?;
434
435                    // get split assignments for all actors
436                    let source_splits = self.source_manager.list_assignments().await;
437                    Ok(BarrierWorkerRuntimeInfoSnapshot {
438                        active_streaming_nodes,
439                        database_job_infos: info,
440                        state_table_committed_epochs,
441                        state_table_log_epochs,
442                        subscription_infos,
443                        stream_actors,
444                        fragment_relations,
445                        source_splits,
446                        background_jobs,
447                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
448                        database_infos,
449                    })
450                }
451            }
452        }
453    }
454
455    pub(super) async fn reload_database_runtime_info_impl(
456        &self,
457        database_id: DatabaseId,
458    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
459        self.clean_dirty_streaming_jobs(Some(database_id))
460            .await
461            .context("clean dirty streaming jobs")?;
462
463        // Mview progress needs to be recovered.
464        tracing::info!(?database_id, "recovering mview progress of database");
465        let background_jobs = self
466            .list_background_mv_progress()
467            .await
468            .context("recover mview progress of database should not fail")?;
469        tracing::info!(?database_id, "recovered mview progress");
470
471        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
472        let _ = self
473            .scheduled_barriers
474            .pre_apply_drop_cancel(Some(database_id));
475
476        let info = self
477            .resolve_graph_info(Some(database_id))
478            .await
479            .inspect_err(|err| {
480                warn!(error = %err.as_report(), "resolve actor info failed");
481            })?;
482        assert!(info.len() <= 1);
483        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
484            assert_eq!(loaded_database_id, database_id);
485            info
486        }) else {
487            return Ok(None);
488        };
489
490        let background_jobs = {
491            let jobs = background_jobs;
492            let mut background_jobs = HashMap::new();
493            for (definition, stream_job_fragments) in jobs {
494                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
495                    continue;
496                }
497                if stream_job_fragments
498                    .tracking_progress_actor_ids()
499                    .is_empty()
500                {
501                    // If there's no tracking actor in the mview, we can finish the job directly.
502                    self.metadata_manager
503                        .catalog_controller
504                        .finish_streaming_job(
505                            stream_job_fragments.stream_job_id().table_id as _,
506                            None,
507                        )
508                        .await?;
509                } else {
510                    background_jobs
511                        .try_insert(
512                            stream_job_fragments.stream_job_id(),
513                            (definition, stream_job_fragments),
514                        )
515                        .expect("non-duplicate");
516                }
517            }
518            background_jobs
519        };
520
521        let (state_table_committed_epochs, state_table_log_epochs) = self
522            .hummock_manager
523            .on_current_version(|version| {
524                Self::resolve_hummock_version_epochs(&background_jobs, version)
525            })
526            .await?;
527
528        let subscription_infos = self
529            .metadata_manager
530            .get_mv_depended_subscriptions(Some(database_id))
531            .await?;
532        assert!(subscription_infos.len() <= 1);
533        let mv_depended_subscriptions = subscription_infos
534            .into_iter()
535            .next()
536            .map(|(loaded_database_id, subscriptions)| {
537                assert_eq!(loaded_database_id, database_id);
538                subscriptions
539            })
540            .unwrap_or_default();
541        let subscription_info = InflightSubscriptionInfo {
542            mv_depended_subscriptions,
543        };
544
545        let fragment_relations = self
546            .metadata_manager
547            .catalog_controller
548            .get_fragment_downstream_relations(
549                info.values()
550                    .flatten()
551                    .map(|fragment| fragment.fragment_id as _)
552                    .collect(),
553            )
554            .await?;
555
556        // update and build all actors.
557        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
558            warn!(error = %err.as_report(), "update actors failed");
559        })?;
560
561        // get split assignments for all actors
562        let source_splits = self.source_manager.list_assignments().await;
563        Ok(Some(DatabaseRuntimeInfoSnapshot {
564            job_infos: info,
565            state_table_committed_epochs,
566            state_table_log_epochs,
567            subscription_info,
568            stream_actors,
569            fragment_relations,
570            source_splits,
571            background_jobs,
572        }))
573    }
574}
575
576impl GlobalBarrierWorkerContextImpl {
577    // Migration timeout.
578    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
579
580    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
581    async fn migrate_actors(
582        &self,
583        active_nodes: &mut ActiveStreamingWorkerNodes,
584    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
585        let mgr = &self.metadata_manager;
586
587        // all worker slots used by actors
588        let all_inuse_worker_slots: HashSet<_> = mgr
589            .catalog_controller
590            .all_inuse_worker_slots()
591            .await?
592            .into_iter()
593            .collect();
594
595        let active_worker_slots: HashSet<_> = active_nodes
596            .current()
597            .values()
598            .flat_map(|node| {
599                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
600            })
601            .collect();
602
603        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
604            .difference(&active_worker_slots)
605            .cloned()
606            .collect();
607
608        if expired_worker_slots.is_empty() {
609            info!("no expired worker slots, skipping.");
610            return self.resolve_graph_info(None).await;
611        }
612
613        info!("start migrate actors.");
614        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
615        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
616
617        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
618            .intersection(&active_worker_slots)
619            .cloned()
620            .collect();
621
622        let start = Instant::now();
623        let mut plan = HashMap::new();
624        'discovery: while !to_migrate_worker_slots.is_empty() {
625            let mut new_worker_slots = active_nodes
626                .current()
627                .values()
628                .flat_map(|worker| {
629                    (0..worker.compute_node_parallelism())
630                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
631                })
632                .collect_vec();
633
634            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
635            let to_migration_size = to_migrate_worker_slots.len();
636            let mut available_size = new_worker_slots.len();
637
638            if available_size < to_migration_size
639                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
640            {
641                let mut factor = 2;
642
643                while available_size < to_migration_size {
644                    let mut extended_worker_slots = active_nodes
645                        .current()
646                        .values()
647                        .flat_map(|worker| {
648                            (0..worker.compute_node_parallelism() * factor)
649                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
650                        })
651                        .collect_vec();
652
653                    extended_worker_slots
654                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
655
656                    extended_worker_slots.sort_by(|a, b| {
657                        a.slot_idx()
658                            .cmp(&b.slot_idx())
659                            .then(a.worker_id().cmp(&b.worker_id()))
660                    });
661
662                    available_size = extended_worker_slots.len();
663                    new_worker_slots = extended_worker_slots;
664
665                    factor *= 2;
666                }
667
668                tracing::info!(
669                    "migration timed out, extending worker slots to {:?} by factor {}",
670                    new_worker_slots,
671                    factor,
672                );
673            }
674
675            if !new_worker_slots.is_empty() {
676                debug!("new worker slots found: {:#?}", new_worker_slots);
677                for target_worker_slot in new_worker_slots {
678                    if let Some(from) = to_migrate_worker_slots.pop() {
679                        debug!(
680                            "plan to migrate from worker slot {} to {}",
681                            from, target_worker_slot
682                        );
683                        inuse_worker_slots.insert(target_worker_slot);
684                        plan.insert(from, target_worker_slot);
685                    } else {
686                        break 'discovery;
687                    }
688                }
689            }
690
691            if to_migrate_worker_slots.is_empty() {
692                break;
693            }
694
695            // wait to get newly joined CN
696            let changed = active_nodes
697                .wait_changed(
698                    Duration::from_millis(5000),
699                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
700                    |active_nodes| {
701                        let current_nodes = active_nodes
702                            .current()
703                            .values()
704                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
705                            .collect_vec();
706                        warn!(
707                            current_nodes = ?current_nodes,
708                            "waiting for new workers to join, elapsed: {}s",
709                            start.elapsed().as_secs()
710                        );
711                    },
712                )
713                .await;
714            warn!(?changed, "get worker changed or timed out. Retry migrate");
715        }
716
717        info!("migration plan {:?}", plan);
718
719        mgr.catalog_controller.migrate_actors(plan).await?;
720
721        info!("migrate actors succeed.");
722
723        self.resolve_graph_info(None).await
724    }
725
726    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
727        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
728            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
729        };
730
731        match self.scale_controller.integrity_check().await {
732            Ok(_) => {
733                info!("integrity check passed");
734            }
735            Err(e) => {
736                return Err(anyhow!(e).context("integrity check failed").into());
737            }
738        }
739
740        let mgr = &self.metadata_manager;
741
742        debug!("start resetting actors distribution");
743
744        let available_workers: HashMap<_, _> = active_nodes
745            .current()
746            .values()
747            .filter(|worker| worker.is_streaming_schedulable())
748            .map(|worker| (worker.id, worker.clone()))
749            .collect();
750
751        info!(
752            "target worker ids for offline scaling: {:?}",
753            available_workers
754        );
755
756        let available_parallelism = active_nodes
757            .current()
758            .values()
759            .map(|worker_node| worker_node.compute_node_parallelism())
760            .sum();
761
762        let mut table_parallelisms = HashMap::new();
763
764        let reschedule_targets: HashMap<_, _> = {
765            let streaming_parallelisms = mgr
766                .catalog_controller
767                .get_all_streaming_parallelisms()
768                .await?;
769
770            let mut result = HashMap::new();
771
772            for (object_id, streaming_parallelism) in streaming_parallelisms {
773                let actual_fragment_parallelism = mgr
774                    .catalog_controller
775                    .get_actual_job_fragment_parallelism(object_id)
776                    .await?;
777
778                let table_parallelism = match streaming_parallelism {
779                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
780                    StreamingParallelism::Custom => model::TableParallelism::Custom,
781                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
782                };
783
784                let target_parallelism = Self::derive_target_parallelism(
785                    available_parallelism,
786                    table_parallelism,
787                    actual_fragment_parallelism,
788                    self.env.opts.default_parallelism,
789                );
790
791                if target_parallelism != table_parallelism {
792                    tracing::info!(
793                        "resetting table {} parallelism from {:?} to {:?}",
794                        object_id,
795                        table_parallelism,
796                        target_parallelism
797                    );
798                }
799
800                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
801
802                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
803
804                result.insert(
805                    object_id as u32,
806                    JobRescheduleTarget {
807                        parallelism: parallelism_change,
808                        resource_group: JobResourceGroupTarget::Keep,
809                    },
810                );
811            }
812
813            result
814        };
815
816        info!(
817            "target table parallelisms for offline scaling: {:?}",
818            reschedule_targets
819        );
820
821        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
822
823        for chunk in reschedule_targets
824            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
825            .map(|c| c.to_vec())
826        {
827            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
828
829            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
830
831            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
832
833            let plan = self
834                .scale_controller
835                .generate_job_reschedule_plan(JobReschedulePolicy {
836                    targets: local_reschedule_targets,
837                })
838                .await?;
839
840            // no need to update
841            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
842                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
843                continue;
844            };
845
846            let mut compared_table_parallelisms = table_parallelisms.clone();
847
848            // skip reschedule if no reschedule is generated.
849            let reschedule_fragment = if plan.reschedules.is_empty() {
850                HashMap::new()
851            } else {
852                self.scale_controller
853                    .analyze_reschedule_plan(
854                        plan.reschedules,
855                        RescheduleOptions {
856                            resolve_no_shuffle_upstream: true,
857                            skip_create_new_actors: true,
858                        },
859                        &mut compared_table_parallelisms,
860                    )
861                    .await?
862            };
863
864            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
865            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
866
867            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
868
869            if let Err(e) = self
870                .scale_controller
871                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
872                .await
873            {
874                tracing::error!(
875                    error = %e.as_report(),
876                    "failed to apply reschedule for offline scaling in recovery",
877                );
878
879                return Err(e);
880            }
881
882            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
883        }
884
885        info!("scaling actors succeed.");
886        Ok(())
887    }
888
889    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
890    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
891    // For Custom, we'll assess the parallelism of the core fragment;
892    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
893    // If it's lower, we'll set it to Fixed.
894    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
895    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
896    fn derive_target_parallelism(
897        available_parallelism: usize,
898        assigned_parallelism: TableParallelism,
899        actual_fragment_parallelism: Option<usize>,
900        default_parallelism: DefaultParallelism,
901    ) -> TableParallelism {
902        match assigned_parallelism {
903            TableParallelism::Custom => {
904                if let Some(fragment_parallelism) = actual_fragment_parallelism {
905                    if fragment_parallelism >= available_parallelism {
906                        TableParallelism::Adaptive
907                    } else {
908                        TableParallelism::Fixed(fragment_parallelism)
909                    }
910                } else {
911                    TableParallelism::Adaptive
912                }
913            }
914            TableParallelism::Adaptive => {
915                match (default_parallelism, actual_fragment_parallelism) {
916                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
917                        if fragment_parallelism == n.get() =>
918                    {
919                        TableParallelism::Fixed(fragment_parallelism)
920                    }
921                    _ => TableParallelism::Adaptive,
922                }
923            }
924            _ => assigned_parallelism,
925        }
926    }
927
928    /// Update all actors in compute nodes.
929    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
930        self.metadata_manager.all_active_actors().await
931    }
932}
933
934#[cfg(test)]
935mod tests {
936    use std::num::NonZeroUsize;
937
938    use super::*;
939    #[test]
940    fn test_derive_target_parallelism() {
941        // total 10, assigned custom, actual 5, default full -> fixed(5)
942        assert_eq!(
943            TableParallelism::Fixed(5),
944            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
945                10,
946                TableParallelism::Custom,
947                Some(5),
948                DefaultParallelism::Full,
949            )
950        );
951
952        // total 10, assigned custom, actual 10, default full -> adaptive
953        assert_eq!(
954            TableParallelism::Adaptive,
955            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
956                10,
957                TableParallelism::Custom,
958                Some(10),
959                DefaultParallelism::Full,
960            )
961        );
962
963        // total 10, assigned custom, actual 11, default full -> adaptive
964        assert_eq!(
965            TableParallelism::Adaptive,
966            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
967                10,
968                TableParallelism::Custom,
969                Some(11),
970                DefaultParallelism::Full,
971            )
972        );
973
974        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
975        assert_eq!(
976            TableParallelism::Adaptive,
977            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
978                10,
979                TableParallelism::Custom,
980                None,
981                DefaultParallelism::Full,
982            )
983        );
984
985        // total 10, assigned adaptive, actual _, default full -> adaptive
986        assert_eq!(
987            TableParallelism::Adaptive,
988            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
989                10,
990                TableParallelism::Adaptive,
991                None,
992                DefaultParallelism::Full,
993            )
994        );
995
996        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
997        assert_eq!(
998            TableParallelism::Fixed(5),
999            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1000                10,
1001                TableParallelism::Adaptive,
1002                Some(5),
1003                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1004            )
1005        );
1006
1007        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
1008        assert_eq!(
1009            TableParallelism::Adaptive,
1010            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1011                10,
1012                TableParallelism::Adaptive,
1013                Some(6),
1014                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1015            )
1016        );
1017    }
1018}