risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
39use crate::stream::{
40    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
41    RescheduleOptions, SourceChange, StreamFragmentGraph,
42};
43use crate::{MetaResult, model};
44
45impl GlobalBarrierWorkerContextImpl {
46    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
47    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
48        let database_id = database_id.map(|database_id| database_id.database_id as _);
49        self.metadata_manager
50            .catalog_controller
51            .clean_dirty_subscription(database_id)
52            .await?;
53        let dirty_associated_source_ids = self
54            .metadata_manager
55            .catalog_controller
56            .clean_dirty_creating_jobs(database_id)
57            .await?;
58
59        // unregister cleaned sources.
60        self.source_manager
61            .apply_source_change(SourceChange::DropSource {
62                dropped_source_ids: dirty_associated_source_ids,
63            })
64            .await;
65
66        Ok(())
67    }
68
69    async fn purge_state_table_from_hummock(
70        &self,
71        all_state_table_ids: &HashSet<TableId>,
72    ) -> MetaResult<()> {
73        self.hummock_manager.purge(all_state_table_ids).await?;
74        Ok(())
75    }
76
77    async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
78        let mgr = &self.metadata_manager;
79        let job_info = mgr
80            .catalog_controller
81            .list_background_creating_jobs(false)
82            .await?;
83
84        try_join_all(
85            job_info
86                .into_iter()
87                .map(|(id, definition, _init_at)| async move {
88                    let table_id = TableId::new(id as _);
89                    let stream_job_fragments =
90                        mgr.catalog_controller.get_job_fragments_by_id(id).await?;
91                    assert_eq!(stream_job_fragments.stream_job_id(), table_id);
92                    Ok((definition, stream_job_fragments))
93                }),
94        )
95        .await
96        // If failed, enter recovery mode.
97    }
98
99    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
100    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
101    /// will create or drop before this barrier flow through them.
102    async fn resolve_graph_info(
103        &self,
104        database_id: Option<DatabaseId>,
105    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
106        let database_id = database_id.map(|database_id| database_id.database_id as _);
107        let all_actor_infos = self
108            .metadata_manager
109            .catalog_controller
110            .load_all_actors(database_id)
111            .await?;
112
113        Ok(all_actor_infos
114            .into_iter()
115            .map(|(loaded_database_id, job_fragment_infos)| {
116                if let Some(database_id) = database_id {
117                    assert_eq!(database_id, loaded_database_id);
118                }
119                (
120                    DatabaseId::new(loaded_database_id as _),
121                    job_fragment_infos
122                        .into_iter()
123                        .map(|(job_id, fragment_infos)| {
124                            let job_id = TableId::new(job_id as _);
125                            (
126                                job_id,
127                                InflightStreamingJobInfo {
128                                    job_id,
129                                    fragment_infos: fragment_infos
130                                        .into_iter()
131                                        .map(|(fragment_id, info)| (fragment_id as _, info))
132                                        .collect(),
133                                },
134                            )
135                        })
136                        .collect(),
137                )
138            })
139            .collect())
140    }
141
142    #[expect(clippy::type_complexity)]
143    fn resolve_hummock_version_epochs(
144        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
145        version: &HummockVersion,
146    ) -> MetaResult<(
147        HashMap<TableId, u64>,
148        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
149    )> {
150        let table_committed_epoch: HashMap<_, _> = version
151            .state_table_info
152            .info()
153            .iter()
154            .map(|(table_id, info)| (*table_id, info.committed_epoch))
155            .collect();
156        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
157            Ok(*table_committed_epoch
158                .get(&table_id)
159                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
160        };
161        let mut min_downstream_committed_epochs = HashMap::new();
162        for (_, job) in background_jobs.values() {
163            let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
164                // Question: should we get the committed epoch from any state tables in the job?
165                warn!(
166                    "background job {} has no committed epoch, skip resolving epochs",
167                    job.stream_job_id
168                );
169                continue;
170            };
171            if let (Some(snapshot_backfill_info), _) =
172                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
173                    job.fragments()
174                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
175                )?
176            {
177                for (upstream_table, snapshot_epoch) in
178                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
179                {
180                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
181                        anyhow!(
182                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
183                            job
184                        )
185                    })?;
186                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
187                    match min_downstream_committed_epochs.entry(upstream_table) {
188                        Entry::Occupied(entry) => {
189                            let prev_min_epoch = entry.into_mut();
190                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
191                        }
192                        Entry::Vacant(entry) => {
193                            entry.insert(pinned_epoch);
194                        }
195                    }
196                }
197            }
198        }
199        let mut log_epochs = HashMap::new();
200        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
201            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
202            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
203                Ordering::Less => {
204                    return Err(anyhow!(
205                        "downstream epoch {} later than upstream epoch {} of table {}",
206                        downstream_committed_epoch,
207                        upstream_committed_epoch,
208                        upstream_table_id
209                    )
210                    .into());
211                }
212                Ordering::Equal => {
213                    continue;
214                }
215                Ordering::Greater => {
216                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
217                    {
218                        let epochs = table_change_log
219                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
220                            .map(|epoch_log| {
221                                (
222                                    epoch_log.non_checkpoint_epochs.clone(),
223                                    epoch_log.checkpoint_epoch,
224                                )
225                            })
226                            .collect_vec();
227                        let first_epochs = epochs.first();
228                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
229                            && *first_checkpoint_epoch == downstream_committed_epoch
230                        {
231                        } else {
232                            return Err(anyhow!(
233                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
234                                epochs, upstream_table_id, downstream_committed_epoch).into()
235                            );
236                        }
237                        log_epochs
238                            .try_insert(upstream_table_id, epochs)
239                            .expect("non-duplicated");
240                    } else {
241                        return Err(anyhow!(
242                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
243                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
244                        );
245                    }
246                }
247            }
248        }
249        Ok((table_committed_epoch, log_epochs))
250    }
251
252    pub(super) async fn reload_runtime_info_impl(
253        &self,
254    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
255        {
256            {
257                {
258                    self.clean_dirty_streaming_jobs(None)
259                        .await
260                        .context("clean dirty streaming jobs")?;
261
262                    // Background job progress needs to be recovered.
263                    tracing::info!("recovering background job progress");
264                    let background_jobs = {
265                        let jobs = self
266                            .list_background_job_progress()
267                            .await
268                            .context("recover background job progress should not fail")?;
269                        let mut background_jobs = HashMap::new();
270                        for (definition, stream_job_fragments) in jobs {
271                            if stream_job_fragments
272                                .tracking_progress_actor_ids()
273                                .is_empty()
274                            {
275                                // If there's no tracking actor in the job, we can finish the job directly.
276                                self.metadata_manager
277                                    .catalog_controller
278                                    .finish_streaming_job(
279                                        stream_job_fragments.stream_job_id().table_id as _,
280                                    )
281                                    .await?;
282                            } else {
283                                background_jobs
284                                    .try_insert(
285                                        stream_job_fragments.stream_job_id(),
286                                        (definition, stream_job_fragments),
287                                    )
288                                    .expect("non-duplicate");
289                            }
290                        }
291                        background_jobs
292                    };
293
294                    tracing::info!("recovered background job progress");
295
296                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
297                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
298
299                    let mut active_streaming_nodes =
300                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
301                            .await?;
302
303                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
304                    info!(
305                        "background streaming jobs: {:?} total {}",
306                        background_streaming_jobs,
307                        background_streaming_jobs.len()
308                    );
309
310                    let unreschedulable_jobs = {
311                        let mut unreschedulable_jobs = HashSet::new();
312
313                        for job_id in background_streaming_jobs {
314                            let scan_types = self
315                                .metadata_manager
316                                .get_job_backfill_scan_types(&job_id)
317                                .await?;
318
319                            if scan_types
320                                .values()
321                                .any(|scan_type| !scan_type.is_reschedulable())
322                            {
323                                unreschedulable_jobs.insert(job_id);
324                            }
325                        }
326
327                        unreschedulable_jobs
328                    };
329
330                    if !unreschedulable_jobs.is_empty() {
331                        tracing::info!(
332                            "unreschedulable background jobs: {:?}",
333                            unreschedulable_jobs
334                        );
335                    }
336
337                    // Resolve actor info for recovery. If there's no actor to recover, most of the
338                    // following steps will be no-op, while the compute nodes will still be reset.
339                    // FIXME: Transactions should be used.
340                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
341                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
342                        && unreschedulable_jobs.is_empty()
343                    {
344                        info!("trigger offline scaling");
345                        self.scale_actors(&active_streaming_nodes)
346                            .await
347                            .inspect_err(|err| {
348                                warn!(error = %err.as_report(), "scale actors failed");
349                            })?;
350
351                        self.resolve_graph_info(None).await.inspect_err(|err| {
352                            warn!(error = %err.as_report(), "resolve actor info failed");
353                        })?
354                    } else {
355                        info!("trigger actor migration");
356                        // Migrate actors in expired CN to newly joined one.
357                        self.migrate_actors(&mut active_streaming_nodes)
358                            .await
359                            .inspect_err(|err| {
360                                warn!(error = %err.as_report(), "migrate actors failed");
361                            })?
362                    };
363
364                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
365                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
366                            warn!(error = %err.as_report(), "resolve actor info failed");
367                        })?
368                    }
369
370                    let info = info;
371
372                    self.purge_state_table_from_hummock(
373                        &info
374                            .values()
375                            .flatten()
376                            .flat_map(|(_, job)| job.existing_table_ids())
377                            .collect(),
378                    )
379                    .await
380                    .context("purge state table from hummock")?;
381
382                    let (state_table_committed_epochs, state_table_log_epochs) = self
383                        .hummock_manager
384                        .on_current_version(|version| {
385                            Self::resolve_hummock_version_epochs(&background_jobs, version)
386                        })
387                        .await?;
388
389                    let subscription_infos = self
390                        .metadata_manager
391                        .get_mv_depended_subscriptions(None)
392                        .await?
393                        .into_iter()
394                        .map(|(database_id, mv_depended_subscriptions)| {
395                            (
396                                database_id,
397                                InflightSubscriptionInfo {
398                                    mv_depended_subscriptions,
399                                },
400                            )
401                        })
402                        .collect();
403
404                    // update and build all actors.
405                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
406                        warn!(error = %err.as_report(), "update actors failed");
407                    })?;
408
409                    let fragment_relations = self
410                        .metadata_manager
411                        .catalog_controller
412                        .get_fragment_downstream_relations(
413                            info.values()
414                                .flatten()
415                                .flat_map(|(_, job)| job.fragment_infos())
416                                .map(|fragment| fragment.fragment_id as _)
417                                .collect(),
418                        )
419                        .await?;
420
421                    let background_jobs = {
422                        let jobs = self
423                            .list_background_job_progress()
424                            .await
425                            .context("recover background job progress should not fail")?;
426                        let mut background_jobs = HashMap::new();
427                        for (definition, stream_job_fragments) in jobs {
428                            background_jobs
429                                .try_insert(
430                                    stream_job_fragments.stream_job_id(),
431                                    (definition, stream_job_fragments),
432                                )
433                                .expect("non-duplicate");
434                        }
435                        background_jobs
436                    };
437
438                    let database_infos = self
439                        .metadata_manager
440                        .catalog_controller
441                        .list_databases()
442                        .await?;
443
444                    // get split assignments for all actors
445                    let source_splits = self.source_manager.list_assignments().await;
446                    let cdc_table_backfill_actors = self
447                        .metadata_manager
448                        .catalog_controller
449                        .cdc_table_backfill_actor_ids()
450                        .await?;
451                    let cdc_table_snapshot_split_assignment =
452                        assign_cdc_table_snapshot_splits_pairs(
453                            cdc_table_backfill_actors,
454                            self.env.meta_store_ref(),
455                        )
456                        .await?;
457                    Ok(BarrierWorkerRuntimeInfoSnapshot {
458                        active_streaming_nodes,
459                        database_job_infos: info,
460                        state_table_committed_epochs,
461                        state_table_log_epochs,
462                        subscription_infos,
463                        stream_actors,
464                        fragment_relations,
465                        source_splits,
466                        background_jobs,
467                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
468                        database_infos,
469                        cdc_table_snapshot_split_assignment,
470                    })
471                }
472            }
473        }
474    }
475
476    pub(super) async fn reload_database_runtime_info_impl(
477        &self,
478        database_id: DatabaseId,
479    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
480        self.clean_dirty_streaming_jobs(Some(database_id))
481            .await
482            .context("clean dirty streaming jobs")?;
483
484        // Background job progress needs to be recovered.
485        tracing::info!(
486            ?database_id,
487            "recovering background job progress of database"
488        );
489        let background_jobs = self
490            .list_background_job_progress()
491            .await
492            .context("recover background job progress of database should not fail")?;
493        tracing::info!(?database_id, "recovered background job progress");
494
495        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
496        let _ = self
497            .scheduled_barriers
498            .pre_apply_drop_cancel(Some(database_id));
499
500        let info = self
501            .resolve_graph_info(Some(database_id))
502            .await
503            .inspect_err(|err| {
504                warn!(error = %err.as_report(), "resolve actor info failed");
505            })?;
506        assert!(info.len() <= 1);
507        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
508            assert_eq!(loaded_database_id, database_id);
509            info
510        }) else {
511            return Ok(None);
512        };
513
514        let background_jobs = {
515            let jobs = background_jobs;
516            let mut background_jobs = HashMap::new();
517            for (definition, stream_job_fragments) in jobs {
518                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
519                    continue;
520                }
521                if stream_job_fragments
522                    .tracking_progress_actor_ids()
523                    .is_empty()
524                {
525                    // If there's no tracking actor in the job, we can finish the job directly.
526                    self.metadata_manager
527                        .catalog_controller
528                        .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
529                        .await?;
530                } else {
531                    background_jobs
532                        .try_insert(
533                            stream_job_fragments.stream_job_id(),
534                            (definition, stream_job_fragments),
535                        )
536                        .expect("non-duplicate");
537                }
538            }
539            background_jobs
540        };
541
542        let (state_table_committed_epochs, state_table_log_epochs) = self
543            .hummock_manager
544            .on_current_version(|version| {
545                Self::resolve_hummock_version_epochs(&background_jobs, version)
546            })
547            .await?;
548
549        let subscription_infos = self
550            .metadata_manager
551            .get_mv_depended_subscriptions(Some(database_id))
552            .await?;
553        assert!(subscription_infos.len() <= 1);
554        let mv_depended_subscriptions = subscription_infos
555            .into_iter()
556            .next()
557            .map(|(loaded_database_id, subscriptions)| {
558                assert_eq!(loaded_database_id, database_id);
559                subscriptions
560            })
561            .unwrap_or_default();
562        let subscription_info = InflightSubscriptionInfo {
563            mv_depended_subscriptions,
564        };
565
566        let fragment_relations = self
567            .metadata_manager
568            .catalog_controller
569            .get_fragment_downstream_relations(
570                info.values()
571                    .flatten()
572                    .map(|fragment| fragment.fragment_id as _)
573                    .collect(),
574            )
575            .await?;
576
577        // update and build all actors.
578        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
579            warn!(error = %err.as_report(), "update actors failed");
580        })?;
581
582        // get split assignments for all actors
583        let source_splits = self.source_manager.list_assignments().await;
584
585        let cdc_table_backfill_actors = self
586            .metadata_manager
587            .catalog_controller
588            .cdc_table_backfill_actor_ids()
589            .await?;
590        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
591            cdc_table_backfill_actors,
592            self.env.meta_store_ref(),
593        )
594        .await?;
595        Ok(Some(DatabaseRuntimeInfoSnapshot {
596            job_infos: info,
597            state_table_committed_epochs,
598            state_table_log_epochs,
599            subscription_info,
600            stream_actors,
601            fragment_relations,
602            source_splits,
603            background_jobs,
604            cdc_table_snapshot_split_assignment,
605        }))
606    }
607}
608
609impl GlobalBarrierWorkerContextImpl {
610    // Migration timeout.
611    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
612
613    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
614    async fn migrate_actors(
615        &self,
616        active_nodes: &mut ActiveStreamingWorkerNodes,
617    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
618        let mgr = &self.metadata_manager;
619
620        // all worker slots used by actors
621        let all_inuse_worker_slots: HashSet<_> = mgr
622            .catalog_controller
623            .all_inuse_worker_slots()
624            .await?
625            .into_iter()
626            .collect();
627
628        let active_worker_slots: HashSet<_> = active_nodes
629            .current()
630            .values()
631            .flat_map(|node| {
632                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
633            })
634            .collect();
635
636        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
637            .difference(&active_worker_slots)
638            .cloned()
639            .collect();
640
641        if expired_worker_slots.is_empty() {
642            info!("no expired worker slots, skipping.");
643            return self.resolve_graph_info(None).await;
644        }
645
646        info!("start migrate actors.");
647        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
648        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
649
650        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
651            .intersection(&active_worker_slots)
652            .cloned()
653            .collect();
654
655        let start = Instant::now();
656        let mut plan = HashMap::new();
657        'discovery: while !to_migrate_worker_slots.is_empty() {
658            let mut new_worker_slots = active_nodes
659                .current()
660                .values()
661                .flat_map(|worker| {
662                    (0..worker.compute_node_parallelism())
663                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
664                })
665                .collect_vec();
666
667            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
668            let to_migration_size = to_migrate_worker_slots.len();
669            let mut available_size = new_worker_slots.len();
670
671            if available_size < to_migration_size
672                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
673            {
674                let mut factor = 2;
675
676                while available_size < to_migration_size {
677                    let mut extended_worker_slots = active_nodes
678                        .current()
679                        .values()
680                        .flat_map(|worker| {
681                            (0..worker.compute_node_parallelism() * factor)
682                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
683                        })
684                        .collect_vec();
685
686                    extended_worker_slots
687                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
688
689                    extended_worker_slots.sort_by(|a, b| {
690                        a.slot_idx()
691                            .cmp(&b.slot_idx())
692                            .then(a.worker_id().cmp(&b.worker_id()))
693                    });
694
695                    available_size = extended_worker_slots.len();
696                    new_worker_slots = extended_worker_slots;
697
698                    factor *= 2;
699                }
700
701                tracing::info!(
702                    "migration timed out, extending worker slots to {:?} by factor {}",
703                    new_worker_slots,
704                    factor,
705                );
706            }
707
708            if !new_worker_slots.is_empty() {
709                debug!("new worker slots found: {:#?}", new_worker_slots);
710                for target_worker_slot in new_worker_slots {
711                    if let Some(from) = to_migrate_worker_slots.pop() {
712                        debug!(
713                            "plan to migrate from worker slot {} to {}",
714                            from, target_worker_slot
715                        );
716                        inuse_worker_slots.insert(target_worker_slot);
717                        plan.insert(from, target_worker_slot);
718                    } else {
719                        break 'discovery;
720                    }
721                }
722            }
723
724            if to_migrate_worker_slots.is_empty() {
725                break;
726            }
727
728            // wait to get newly joined CN
729            let changed = active_nodes
730                .wait_changed(
731                    Duration::from_millis(5000),
732                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
733                    |active_nodes| {
734                        let current_nodes = active_nodes
735                            .current()
736                            .values()
737                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
738                            .collect_vec();
739                        warn!(
740                            current_nodes = ?current_nodes,
741                            "waiting for new workers to join, elapsed: {}s",
742                            start.elapsed().as_secs()
743                        );
744                    },
745                )
746                .await;
747            warn!(?changed, "get worker changed or timed out. Retry migrate");
748        }
749
750        info!("migration plan {:?}", plan);
751
752        mgr.catalog_controller.migrate_actors(plan).await?;
753
754        info!("migrate actors succeed.");
755
756        self.resolve_graph_info(None).await
757    }
758
759    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
760        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
761            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
762        };
763
764        match self.scale_controller.integrity_check().await {
765            Ok(_) => {
766                info!("integrity check passed");
767            }
768            Err(e) => {
769                return Err(anyhow!(e).context("integrity check failed").into());
770            }
771        }
772
773        let mgr = &self.metadata_manager;
774
775        debug!("start resetting actors distribution");
776
777        let available_workers: HashMap<_, _> = active_nodes
778            .current()
779            .values()
780            .filter(|worker| worker.is_streaming_schedulable())
781            .map(|worker| (worker.id, worker.clone()))
782            .collect();
783
784        info!(
785            "target worker ids for offline scaling: {:?}",
786            available_workers
787        );
788
789        let available_parallelism = active_nodes
790            .current()
791            .values()
792            .map(|worker_node| worker_node.compute_node_parallelism())
793            .sum();
794
795        let mut table_parallelisms = HashMap::new();
796
797        let reschedule_targets: HashMap<_, _> = {
798            let streaming_parallelisms = mgr
799                .catalog_controller
800                .get_all_streaming_parallelisms()
801                .await?;
802
803            let mut result = HashMap::new();
804
805            for (object_id, streaming_parallelism) in streaming_parallelisms {
806                let actual_fragment_parallelism = mgr
807                    .catalog_controller
808                    .get_actual_job_fragment_parallelism(object_id)
809                    .await?;
810
811                let table_parallelism = match streaming_parallelism {
812                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
813                    StreamingParallelism::Custom => model::TableParallelism::Custom,
814                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
815                };
816
817                let target_parallelism = Self::derive_target_parallelism(
818                    available_parallelism,
819                    table_parallelism,
820                    actual_fragment_parallelism,
821                    self.env.opts.default_parallelism,
822                );
823
824                if target_parallelism != table_parallelism {
825                    tracing::info!(
826                        "resetting table {} parallelism from {:?} to {:?}",
827                        object_id,
828                        table_parallelism,
829                        target_parallelism
830                    );
831                }
832
833                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
834
835                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
836
837                result.insert(
838                    object_id as u32,
839                    JobRescheduleTarget {
840                        parallelism: parallelism_change,
841                        resource_group: JobResourceGroupTarget::Keep,
842                    },
843                );
844            }
845
846            result
847        };
848
849        info!(
850            "target table parallelisms for offline scaling: {:?}",
851            reschedule_targets
852        );
853
854        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
855
856        for chunk in reschedule_targets
857            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
858            .map(|c| c.to_vec())
859        {
860            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
861
862            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
863
864            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
865
866            let plan = self
867                .scale_controller
868                .generate_job_reschedule_plan(
869                    JobReschedulePolicy {
870                        targets: local_reschedule_targets,
871                    },
872                    false,
873                )
874                .await?;
875
876            // no need to update
877            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
878                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
879                continue;
880            };
881
882            let mut compared_table_parallelisms = table_parallelisms.clone();
883
884            // skip reschedule if no reschedule is generated.
885            let reschedule_fragment = if plan.reschedules.is_empty() {
886                HashMap::new()
887            } else {
888                self.scale_controller
889                    .analyze_reschedule_plan(
890                        plan.reschedules,
891                        RescheduleOptions {
892                            resolve_no_shuffle_upstream: true,
893                            skip_create_new_actors: true,
894                        },
895                        &mut compared_table_parallelisms,
896                    )
897                    .await?
898            };
899
900            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
901            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
902
903            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
904
905            if let Err(e) = self
906                .scale_controller
907                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
908                .await
909            {
910                tracing::error!(
911                    error = %e.as_report(),
912                    "failed to apply reschedule for offline scaling in recovery",
913                );
914
915                return Err(e);
916            }
917
918            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
919        }
920
921        info!("scaling actors succeed.");
922        Ok(())
923    }
924
925    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
926    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
927    // For Custom, we'll assess the parallelism of the core fragment;
928    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
929    // If it's lower, we'll set it to Fixed.
930    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
931    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
932    fn derive_target_parallelism(
933        available_parallelism: usize,
934        assigned_parallelism: TableParallelism,
935        actual_fragment_parallelism: Option<usize>,
936        default_parallelism: DefaultParallelism,
937    ) -> TableParallelism {
938        match assigned_parallelism {
939            TableParallelism::Custom => {
940                if let Some(fragment_parallelism) = actual_fragment_parallelism {
941                    if fragment_parallelism >= available_parallelism {
942                        TableParallelism::Adaptive
943                    } else {
944                        TableParallelism::Fixed(fragment_parallelism)
945                    }
946                } else {
947                    TableParallelism::Adaptive
948                }
949            }
950            TableParallelism::Adaptive => {
951                match (default_parallelism, actual_fragment_parallelism) {
952                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
953                        if fragment_parallelism == n.get() =>
954                    {
955                        TableParallelism::Fixed(fragment_parallelism)
956                    }
957                    _ => TableParallelism::Adaptive,
958                }
959            }
960            _ => assigned_parallelism,
961        }
962    }
963
964    /// Update all actors in compute nodes.
965    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
966        self.metadata_manager.all_active_actors().await
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use std::num::NonZeroUsize;
973
974    use super::*;
975    #[test]
976    fn test_derive_target_parallelism() {
977        // total 10, assigned custom, actual 5, default full -> fixed(5)
978        assert_eq!(
979            TableParallelism::Fixed(5),
980            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
981                10,
982                TableParallelism::Custom,
983                Some(5),
984                DefaultParallelism::Full,
985            )
986        );
987
988        // total 10, assigned custom, actual 10, default full -> adaptive
989        assert_eq!(
990            TableParallelism::Adaptive,
991            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
992                10,
993                TableParallelism::Custom,
994                Some(10),
995                DefaultParallelism::Full,
996            )
997        );
998
999        // total 10, assigned custom, actual 11, default full -> adaptive
1000        assert_eq!(
1001            TableParallelism::Adaptive,
1002            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1003                10,
1004                TableParallelism::Custom,
1005                Some(11),
1006                DefaultParallelism::Full,
1007            )
1008        );
1009
1010        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
1011        assert_eq!(
1012            TableParallelism::Adaptive,
1013            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1014                10,
1015                TableParallelism::Custom,
1016                None,
1017                DefaultParallelism::Full,
1018            )
1019        );
1020
1021        // total 10, assigned adaptive, actual _, default full -> adaptive
1022        assert_eq!(
1023            TableParallelism::Adaptive,
1024            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1025                10,
1026                TableParallelism::Adaptive,
1027                None,
1028                DefaultParallelism::Full,
1029            )
1030        );
1031
1032        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
1033        assert_eq!(
1034            TableParallelism::Fixed(5),
1035            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1036                10,
1037                TableParallelism::Adaptive,
1038                Some(5),
1039                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1040            )
1041        );
1042
1043        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
1044        assert_eq!(
1045            TableParallelism::Adaptive,
1046            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1047                10,
1048                TableParallelism::Adaptive,
1049                Some(6),
1050                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1051            )
1052        );
1053    }
1054}