risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::StreamingParallelism;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tokio::time::Instant;
34use tracing::{debug, info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::barrier::context::GlobalBarrierWorkerContextImpl;
38use crate::barrier::info::InflightStreamingJobInfo;
39use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
40use crate::manager::ActiveStreamingWorkerNodes;
41use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
42use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
43use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
44use crate::stream::{
45    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
46    RescheduleOptions, SourceChange, StreamFragmentGraph,
47};
48use crate::{MetaResult, model};
49
50impl GlobalBarrierWorkerContextImpl {
51    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
52    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
53        let database_id = database_id.map(|database_id| database_id.database_id as _);
54        self.metadata_manager
55            .catalog_controller
56            .clean_dirty_subscription(database_id)
57            .await?;
58        let dirty_associated_source_ids = self
59            .metadata_manager
60            .catalog_controller
61            .clean_dirty_creating_jobs(database_id)
62            .await?;
63
64        // unregister cleaned sources.
65        self.source_manager
66            .apply_source_change(SourceChange::DropSource {
67                dropped_source_ids: dirty_associated_source_ids,
68            })
69            .await;
70
71        Ok(())
72    }
73
74    async fn purge_state_table_from_hummock(
75        &self,
76        all_state_table_ids: &HashSet<TableId>,
77    ) -> MetaResult<()> {
78        self.hummock_manager.purge(all_state_table_ids).await?;
79        Ok(())
80    }
81
82    async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
83        let mgr = &self.metadata_manager;
84        let job_info = mgr
85            .catalog_controller
86            .list_background_creating_jobs(false)
87            .await?;
88
89        try_join_all(
90            job_info
91                .into_iter()
92                .map(|(id, definition, _init_at)| async move {
93                    let table_id = TableId::new(id as _);
94                    let stream_job_fragments =
95                        mgr.catalog_controller.get_job_fragments_by_id(id).await?;
96                    assert_eq!(stream_job_fragments.stream_job_id(), table_id);
97                    Ok((definition, stream_job_fragments))
98                }),
99        )
100        .await
101        // If failed, enter recovery mode.
102    }
103
104    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
105    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
106    /// will create or drop before this barrier flow through them.
107    async fn resolve_graph_info(
108        &self,
109        database_id: Option<DatabaseId>,
110    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
111        let database_id = database_id.map(|database_id| database_id.database_id as _);
112        let all_actor_infos = self
113            .metadata_manager
114            .catalog_controller
115            .load_all_actors(database_id)
116            .await?;
117
118        Ok(all_actor_infos
119            .into_iter()
120            .map(|(loaded_database_id, job_fragment_infos)| {
121                if let Some(database_id) = database_id {
122                    assert_eq!(database_id, loaded_database_id);
123                }
124                (
125                    DatabaseId::new(loaded_database_id as _),
126                    job_fragment_infos
127                        .into_iter()
128                        .map(|(job_id, fragment_infos)| {
129                            let job_id = TableId::new(job_id as _);
130                            (
131                                job_id,
132                                InflightStreamingJobInfo {
133                                    job_id,
134                                    fragment_infos: fragment_infos
135                                        .into_iter()
136                                        .map(|(fragment_id, info)| (fragment_id as _, info))
137                                        .collect(),
138                                },
139                            )
140                        })
141                        .collect(),
142                )
143            })
144            .collect())
145    }
146
147    #[expect(clippy::type_complexity)]
148    fn resolve_hummock_version_epochs(
149        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
150        version: &HummockVersion,
151    ) -> MetaResult<(
152        HashMap<TableId, u64>,
153        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
154    )> {
155        let table_committed_epoch: HashMap<_, _> = version
156            .state_table_info
157            .info()
158            .iter()
159            .map(|(table_id, info)| (*table_id, info.committed_epoch))
160            .collect();
161        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
162            Ok(*table_committed_epoch
163                .get(&table_id)
164                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
165        };
166        let mut min_downstream_committed_epochs = HashMap::new();
167        for (_, job) in background_jobs.values() {
168            let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
169                // Question: should we get the committed epoch from any state tables in the job?
170                warn!(
171                    "background job {} has no committed epoch, skip resolving epochs",
172                    job.stream_job_id
173                );
174                continue;
175            };
176            if let (Some(snapshot_backfill_info), _) =
177                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
178                    job.fragments()
179                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
180                )?
181            {
182                for (upstream_table, snapshot_epoch) in
183                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
184                {
185                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
186                        anyhow!(
187                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
188                            job
189                        )
190                    })?;
191                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
192                    match min_downstream_committed_epochs.entry(upstream_table) {
193                        Entry::Occupied(entry) => {
194                            let prev_min_epoch = entry.into_mut();
195                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
196                        }
197                        Entry::Vacant(entry) => {
198                            entry.insert(pinned_epoch);
199                        }
200                    }
201                }
202            }
203        }
204        let mut log_epochs = HashMap::new();
205        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
206            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
207            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
208                Ordering::Less => {
209                    return Err(anyhow!(
210                        "downstream epoch {} later than upstream epoch {} of table {}",
211                        downstream_committed_epoch,
212                        upstream_committed_epoch,
213                        upstream_table_id
214                    )
215                    .into());
216                }
217                Ordering::Equal => {
218                    continue;
219                }
220                Ordering::Greater => {
221                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
222                    {
223                        let epochs = table_change_log
224                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
225                            .map(|epoch_log| {
226                                (
227                                    epoch_log.non_checkpoint_epochs.clone(),
228                                    epoch_log.checkpoint_epoch,
229                                )
230                            })
231                            .collect_vec();
232                        let first_epochs = epochs.first();
233                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
234                            && *first_checkpoint_epoch == downstream_committed_epoch
235                        {
236                        } else {
237                            return Err(anyhow!(
238                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
239                                epochs, upstream_table_id, downstream_committed_epoch).into()
240                            );
241                        }
242                        log_epochs
243                            .try_insert(upstream_table_id, epochs)
244                            .expect("non-duplicated");
245                    } else {
246                        return Err(anyhow!(
247                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
248                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
249                        );
250                    }
251                }
252            }
253        }
254        Ok((table_committed_epoch, log_epochs))
255    }
256
257    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
258    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
259    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
260    /// the operator.
261    async fn recovery_table_with_upstream_sinks(
262        &self,
263        inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
264    ) -> MetaResult<()> {
265        let mut jobs = inflight_jobs.values_mut().try_fold(
266            HashMap::new(),
267            |mut acc, table_map| -> MetaResult<_> {
268                for (tid, job) in table_map {
269                    if acc.insert(tid.table_id, job).is_some() {
270                        return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
271                    }
272                }
273                Ok(acc)
274            },
275        )?;
276        let job_ids = jobs.keys().cloned().collect_vec();
277        // Only `Table` will be returned here, ignoring other catalog objects.
278        let tables = self
279            .metadata_manager
280            .catalog_controller
281            .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
282            .await?;
283        for table in tables {
284            assert_eq!(table.table_type(), PbTableType::Table);
285            let fragments = jobs.get_mut(&table.id).unwrap();
286            let mut target_fragment_id = None;
287            for fragment in fragments.fragment_infos.values() {
288                let mut is_target_fragment = false;
289                visit_stream_node_cont(&fragment.nodes, |node| {
290                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
291                        is_target_fragment = true;
292                        false
293                    } else {
294                        true
295                    }
296                });
297                if is_target_fragment {
298                    target_fragment_id = Some(fragment.fragment_id);
299                    break;
300                }
301            }
302            let Some(target_fragment_id) = target_fragment_id else {
303                tracing::debug!(
304                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
305                    table.id
306                );
307                continue;
308            };
309            let target_fragment = fragments
310                .fragment_infos
311                .get_mut(&target_fragment_id)
312                .unwrap();
313            let upstream_infos = self
314                .metadata_manager
315                .catalog_controller
316                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
317                .await?;
318            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
319        }
320
321        Ok(())
322    }
323
324    pub(super) async fn reload_runtime_info_impl(
325        &self,
326    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
327        {
328            {
329                {
330                    self.clean_dirty_streaming_jobs(None)
331                        .await
332                        .context("clean dirty streaming jobs")?;
333
334                    // Background job progress needs to be recovered.
335                    tracing::info!("recovering background job progress");
336                    let background_jobs = {
337                        let jobs = self
338                            .list_background_job_progress()
339                            .await
340                            .context("recover background job progress should not fail")?;
341                        let mut background_jobs = HashMap::new();
342                        for (definition, stream_job_fragments) in jobs {
343                            if stream_job_fragments
344                                .tracking_progress_actor_ids()
345                                .is_empty()
346                            {
347                                // If there's no tracking actor in the job, we can finish the job directly.
348                                self.metadata_manager
349                                    .catalog_controller
350                                    .finish_streaming_job(
351                                        stream_job_fragments.stream_job_id().table_id as _,
352                                    )
353                                    .await?;
354                            } else {
355                                background_jobs
356                                    .try_insert(
357                                        stream_job_fragments.stream_job_id(),
358                                        (definition, stream_job_fragments),
359                                    )
360                                    .expect("non-duplicate");
361                            }
362                        }
363                        background_jobs
364                    };
365
366                    tracing::info!("recovered background job progress");
367
368                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
369                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
370
371                    let mut active_streaming_nodes =
372                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
373                            .await?;
374
375                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
376                    info!(
377                        "background streaming jobs: {:?} total {}",
378                        background_streaming_jobs,
379                        background_streaming_jobs.len()
380                    );
381
382                    let unreschedulable_jobs = {
383                        let mut unreschedulable_jobs = HashSet::new();
384
385                        for job_id in background_streaming_jobs {
386                            let scan_types = self
387                                .metadata_manager
388                                .get_job_backfill_scan_types(&job_id)
389                                .await?;
390
391                            if scan_types
392                                .values()
393                                .any(|scan_type| !scan_type.is_reschedulable())
394                            {
395                                unreschedulable_jobs.insert(job_id);
396                            }
397                        }
398
399                        unreschedulable_jobs
400                    };
401
402                    if !unreschedulable_jobs.is_empty() {
403                        tracing::info!(
404                            "unreschedulable background jobs: {:?}",
405                            unreschedulable_jobs
406                        );
407                    }
408
409                    // Resolve actor info for recovery. If there's no actor to recover, most of the
410                    // following steps will be no-op, while the compute nodes will still be reset.
411                    // FIXME: Transactions should be used.
412                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
413                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
414                        && unreschedulable_jobs.is_empty()
415                    {
416                        info!("trigger offline scaling");
417                        self.scale_actors(&active_streaming_nodes)
418                            .await
419                            .inspect_err(|err| {
420                                warn!(error = %err.as_report(), "scale actors failed");
421                            })?;
422
423                        self.resolve_graph_info(None).await.inspect_err(|err| {
424                            warn!(error = %err.as_report(), "resolve actor info failed");
425                        })?
426                    } else {
427                        info!("trigger actor migration");
428                        // Migrate actors in expired CN to newly joined one.
429                        self.migrate_actors(&mut active_streaming_nodes)
430                            .await
431                            .inspect_err(|err| {
432                                warn!(error = %err.as_report(), "migrate actors failed");
433                            })?
434                    };
435
436                    if self.scheduled_barriers.pre_apply_drop_cancel(None) {
437                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
438                            warn!(error = %err.as_report(), "resolve actor info failed");
439                        })?
440                    }
441
442                    self.recovery_table_with_upstream_sinks(&mut info).await?;
443
444                    let info = info;
445
446                    self.purge_state_table_from_hummock(
447                        &info
448                            .values()
449                            .flatten()
450                            .flat_map(|(_, job)| job.existing_table_ids())
451                            .collect(),
452                    )
453                    .await
454                    .context("purge state table from hummock")?;
455
456                    let (state_table_committed_epochs, state_table_log_epochs) = self
457                        .hummock_manager
458                        .on_current_version(|version| {
459                            Self::resolve_hummock_version_epochs(&background_jobs, version)
460                        })
461                        .await?;
462
463                    let subscription_infos = self
464                        .metadata_manager
465                        .get_mv_depended_subscriptions(None)
466                        .await?
467                        .into_iter()
468                        .map(|(database_id, mv_depended_subscriptions)| {
469                            (
470                                database_id,
471                                InflightSubscriptionInfo {
472                                    mv_depended_subscriptions,
473                                },
474                            )
475                        })
476                        .collect();
477
478                    // update and build all actors.
479                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
480                        warn!(error = %err.as_report(), "update actors failed");
481                    })?;
482
483                    let fragment_relations = self
484                        .metadata_manager
485                        .catalog_controller
486                        .get_fragment_downstream_relations(
487                            info.values()
488                                .flatten()
489                                .flat_map(|(_, job)| job.fragment_infos())
490                                .map(|fragment| fragment.fragment_id as _)
491                                .collect(),
492                        )
493                        .await?;
494
495                    let background_jobs = {
496                        let jobs = self
497                            .list_background_job_progress()
498                            .await
499                            .context("recover background job progress should not fail")?;
500                        let mut background_jobs = HashMap::new();
501                        for (definition, stream_job_fragments) in jobs {
502                            background_jobs
503                                .try_insert(
504                                    stream_job_fragments.stream_job_id(),
505                                    (definition, stream_job_fragments),
506                                )
507                                .expect("non-duplicate");
508                        }
509                        background_jobs
510                    };
511
512                    let database_infos = self
513                        .metadata_manager
514                        .catalog_controller
515                        .list_databases()
516                        .await?;
517
518                    // get split assignments for all actors
519                    let source_splits = self.source_manager.list_assignments().await;
520                    let cdc_table_backfill_actors = self
521                        .metadata_manager
522                        .catalog_controller
523                        .cdc_table_backfill_actor_ids()
524                        .await?;
525                    let cdc_table_ids = cdc_table_backfill_actors
526                        .keys()
527                        .cloned()
528                        .collect::<Vec<_>>();
529                    let cdc_table_snapshot_split_assignment =
530                        assign_cdc_table_snapshot_splits_pairs(
531                            cdc_table_backfill_actors,
532                            self.env.meta_store_ref(),
533                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
534                        )
535                        .await?;
536                    let cdc_table_snapshot_split_assignment =
537                        if cdc_table_snapshot_split_assignment.is_empty() {
538                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
539                        } else {
540                            let generation = self
541                                .env
542                                .cdc_table_backfill_tracker
543                                .next_generation(cdc_table_ids.into_iter());
544                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
545                                cdc_table_snapshot_split_assignment,
546                                generation,
547                            )
548                        };
549                    Ok(BarrierWorkerRuntimeInfoSnapshot {
550                        active_streaming_nodes,
551                        database_job_infos: info,
552                        state_table_committed_epochs,
553                        state_table_log_epochs,
554                        subscription_infos,
555                        stream_actors,
556                        fragment_relations,
557                        source_splits,
558                        background_jobs,
559                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
560                        database_infos,
561                        cdc_table_snapshot_split_assignment,
562                    })
563                }
564            }
565        }
566    }
567
568    pub(super) async fn reload_database_runtime_info_impl(
569        &self,
570        database_id: DatabaseId,
571    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
572        self.clean_dirty_streaming_jobs(Some(database_id))
573            .await
574            .context("clean dirty streaming jobs")?;
575
576        // Background job progress needs to be recovered.
577        tracing::info!(
578            ?database_id,
579            "recovering background job progress of database"
580        );
581        let background_jobs = self
582            .list_background_job_progress()
583            .await
584            .context("recover background job progress of database should not fail")?;
585        tracing::info!(?database_id, "recovered background job progress");
586
587        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
588        let _ = self
589            .scheduled_barriers
590            .pre_apply_drop_cancel(Some(database_id));
591
592        let mut info = self
593            .resolve_graph_info(Some(database_id))
594            .await
595            .inspect_err(|err| {
596                warn!(error = %err.as_report(), "resolve actor info failed");
597            })?;
598
599        self.recovery_table_with_upstream_sinks(&mut info).await?;
600
601        assert!(info.len() <= 1);
602        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
603            assert_eq!(loaded_database_id, database_id);
604            info
605        }) else {
606            return Ok(None);
607        };
608
609        let background_jobs = {
610            let jobs = background_jobs;
611            let mut background_jobs = HashMap::new();
612            for (definition, stream_job_fragments) in jobs {
613                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
614                    continue;
615                }
616                if stream_job_fragments
617                    .tracking_progress_actor_ids()
618                    .is_empty()
619                {
620                    // If there's no tracking actor in the job, we can finish the job directly.
621                    self.metadata_manager
622                        .catalog_controller
623                        .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
624                        .await?;
625                } else {
626                    background_jobs
627                        .try_insert(
628                            stream_job_fragments.stream_job_id(),
629                            (definition, stream_job_fragments),
630                        )
631                        .expect("non-duplicate");
632                }
633            }
634            background_jobs
635        };
636
637        let (state_table_committed_epochs, state_table_log_epochs) = self
638            .hummock_manager
639            .on_current_version(|version| {
640                Self::resolve_hummock_version_epochs(&background_jobs, version)
641            })
642            .await?;
643
644        let subscription_infos = self
645            .metadata_manager
646            .get_mv_depended_subscriptions(Some(database_id))
647            .await?;
648        assert!(subscription_infos.len() <= 1);
649        let mv_depended_subscriptions = subscription_infos
650            .into_iter()
651            .next()
652            .map(|(loaded_database_id, subscriptions)| {
653                assert_eq!(loaded_database_id, database_id);
654                subscriptions
655            })
656            .unwrap_or_default();
657        let subscription_info = InflightSubscriptionInfo {
658            mv_depended_subscriptions,
659        };
660
661        let fragment_relations = self
662            .metadata_manager
663            .catalog_controller
664            .get_fragment_downstream_relations(
665                info.values()
666                    .flatten()
667                    .map(|fragment| fragment.fragment_id as _)
668                    .collect(),
669            )
670            .await?;
671
672        // update and build all actors.
673        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
674            warn!(error = %err.as_report(), "update actors failed");
675        })?;
676
677        // get split assignments for all actors
678        let source_splits = self.source_manager.list_assignments().await;
679
680        let cdc_table_backfill_actors = self
681            .metadata_manager
682            .catalog_controller
683            .cdc_table_backfill_actor_ids()
684            .await?;
685        let cdc_table_ids = cdc_table_backfill_actors
686            .keys()
687            .cloned()
688            .collect::<Vec<_>>();
689        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
690            cdc_table_backfill_actors,
691            self.env.meta_store_ref(),
692            self.env.cdc_table_backfill_tracker.completed_job_ids(),
693        )
694        .await?;
695        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
696        {
697            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
698        } else {
699            CdcTableSnapshotSplitAssignmentWithGeneration::new(
700                cdc_table_snapshot_split_assignment,
701                self.env
702                    .cdc_table_backfill_tracker
703                    .next_generation(cdc_table_ids.into_iter()),
704            )
705        };
706        Ok(Some(DatabaseRuntimeInfoSnapshot {
707            job_infos: info,
708            state_table_committed_epochs,
709            state_table_log_epochs,
710            subscription_info,
711            stream_actors,
712            fragment_relations,
713            source_splits,
714            background_jobs,
715            cdc_table_snapshot_split_assignment,
716        }))
717    }
718}
719
720impl GlobalBarrierWorkerContextImpl {
721    // Migration timeout.
722    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
723
724    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
725    async fn migrate_actors(
726        &self,
727        active_nodes: &mut ActiveStreamingWorkerNodes,
728    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
729        let mgr = &self.metadata_manager;
730
731        // all worker slots used by actors
732        let all_inuse_worker_slots: HashSet<_> = mgr
733            .catalog_controller
734            .all_inuse_worker_slots()
735            .await?
736            .into_iter()
737            .collect();
738
739        let active_worker_slots: HashSet<_> = active_nodes
740            .current()
741            .values()
742            .flat_map(|node| {
743                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
744            })
745            .collect();
746
747        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
748            .difference(&active_worker_slots)
749            .cloned()
750            .collect();
751
752        if expired_worker_slots.is_empty() {
753            info!("no expired worker slots, skipping.");
754            return self.resolve_graph_info(None).await;
755        }
756
757        info!("start migrate actors.");
758        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
759        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
760
761        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
762            .intersection(&active_worker_slots)
763            .cloned()
764            .collect();
765
766        let start = Instant::now();
767        let mut plan = HashMap::new();
768        'discovery: while !to_migrate_worker_slots.is_empty() {
769            let mut new_worker_slots = active_nodes
770                .current()
771                .values()
772                .flat_map(|worker| {
773                    (0..worker.compute_node_parallelism())
774                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
775                })
776                .collect_vec();
777
778            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
779            let to_migration_size = to_migrate_worker_slots.len();
780            let mut available_size = new_worker_slots.len();
781
782            if available_size < to_migration_size
783                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
784            {
785                let mut factor = 2;
786
787                while available_size < to_migration_size {
788                    let mut extended_worker_slots = active_nodes
789                        .current()
790                        .values()
791                        .flat_map(|worker| {
792                            (0..worker.compute_node_parallelism() * factor)
793                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
794                        })
795                        .collect_vec();
796
797                    extended_worker_slots
798                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
799
800                    extended_worker_slots.sort_by(|a, b| {
801                        a.slot_idx()
802                            .cmp(&b.slot_idx())
803                            .then(a.worker_id().cmp(&b.worker_id()))
804                    });
805
806                    available_size = extended_worker_slots.len();
807                    new_worker_slots = extended_worker_slots;
808
809                    factor *= 2;
810                }
811
812                tracing::info!(
813                    "migration timed out, extending worker slots to {:?} by factor {}",
814                    new_worker_slots,
815                    factor,
816                );
817            }
818
819            if !new_worker_slots.is_empty() {
820                debug!("new worker slots found: {:#?}", new_worker_slots);
821                for target_worker_slot in new_worker_slots {
822                    if let Some(from) = to_migrate_worker_slots.pop() {
823                        debug!(
824                            "plan to migrate from worker slot {} to {}",
825                            from, target_worker_slot
826                        );
827                        inuse_worker_slots.insert(target_worker_slot);
828                        plan.insert(from, target_worker_slot);
829                    } else {
830                        break 'discovery;
831                    }
832                }
833            }
834
835            if to_migrate_worker_slots.is_empty() {
836                break;
837            }
838
839            // wait to get newly joined CN
840            let changed = active_nodes
841                .wait_changed(
842                    Duration::from_millis(5000),
843                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
844                    |active_nodes| {
845                        let current_nodes = active_nodes
846                            .current()
847                            .values()
848                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
849                            .collect_vec();
850                        warn!(
851                            current_nodes = ?current_nodes,
852                            "waiting for new workers to join, elapsed: {}s",
853                            start.elapsed().as_secs()
854                        );
855                    },
856                )
857                .await;
858            warn!(?changed, "get worker changed or timed out. Retry migrate");
859        }
860
861        info!("migration plan {:?}", plan);
862
863        mgr.catalog_controller.migrate_actors(plan).await?;
864
865        info!("migrate actors succeed.");
866
867        self.resolve_graph_info(None).await
868    }
869
870    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
871        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
872            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
873        };
874
875        match self.scale_controller.integrity_check().await {
876            Ok(_) => {
877                info!("integrity check passed");
878            }
879            Err(e) => {
880                return Err(anyhow!(e).context("integrity check failed").into());
881            }
882        }
883
884        let mgr = &self.metadata_manager;
885
886        debug!("start resetting actors distribution");
887
888        let available_workers: HashMap<_, _> = active_nodes
889            .current()
890            .values()
891            .filter(|worker| worker.is_streaming_schedulable())
892            .map(|worker| (worker.id, worker.clone()))
893            .collect();
894
895        info!(
896            "target worker ids for offline scaling: {:?}",
897            available_workers
898        );
899
900        let available_parallelism = active_nodes
901            .current()
902            .values()
903            .map(|worker_node| worker_node.compute_node_parallelism())
904            .sum();
905
906        let mut table_parallelisms = HashMap::new();
907
908        let reschedule_targets: HashMap<_, _> = {
909            let streaming_parallelisms = mgr
910                .catalog_controller
911                .get_all_streaming_parallelisms()
912                .await?;
913
914            let mut result = HashMap::new();
915
916            for (object_id, streaming_parallelism) in streaming_parallelisms {
917                let actual_fragment_parallelism = mgr
918                    .catalog_controller
919                    .get_actual_job_fragment_parallelism(object_id)
920                    .await?;
921
922                let table_parallelism = match streaming_parallelism {
923                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
924                    StreamingParallelism::Custom => model::TableParallelism::Custom,
925                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
926                };
927
928                let target_parallelism = Self::derive_target_parallelism(
929                    available_parallelism,
930                    table_parallelism,
931                    actual_fragment_parallelism,
932                    self.env.opts.default_parallelism,
933                );
934
935                if target_parallelism != table_parallelism {
936                    tracing::info!(
937                        "resetting table {} parallelism from {:?} to {:?}",
938                        object_id,
939                        table_parallelism,
940                        target_parallelism
941                    );
942                }
943
944                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
945
946                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
947
948                result.insert(
949                    object_id as u32,
950                    JobRescheduleTarget {
951                        parallelism: parallelism_change,
952                        resource_group: JobResourceGroupTarget::Keep,
953                    },
954                );
955            }
956
957            result
958        };
959
960        info!(
961            "target table parallelisms for offline scaling: {:?}",
962            reschedule_targets
963        );
964
965        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
966
967        for chunk in reschedule_targets
968            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
969            .map(|c| c.to_vec())
970        {
971            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
972
973            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
974
975            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
976
977            let plan = self
978                .scale_controller
979                .generate_job_reschedule_plan(
980                    JobReschedulePolicy {
981                        targets: local_reschedule_targets,
982                    },
983                    false,
984                )
985                .await?;
986
987            // no need to update
988            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
989                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
990                continue;
991            };
992
993            let mut compared_table_parallelisms = table_parallelisms.clone();
994
995            // skip reschedule if no reschedule is generated.
996            let reschedule_fragment = if plan.reschedules.is_empty() {
997                HashMap::new()
998            } else {
999                self.scale_controller
1000                    .analyze_reschedule_plan(
1001                        plan.reschedules,
1002                        RescheduleOptions {
1003                            resolve_no_shuffle_upstream: true,
1004                            skip_create_new_actors: true,
1005                        },
1006                        &mut compared_table_parallelisms,
1007                    )
1008                    .await?
1009            };
1010
1011            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
1012            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
1013
1014            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
1015
1016            if let Err(e) = self
1017                .scale_controller
1018                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
1019                .await
1020            {
1021                tracing::error!(
1022                    error = %e.as_report(),
1023                    "failed to apply reschedule for offline scaling in recovery",
1024                );
1025
1026                return Err(e);
1027            }
1028
1029            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
1030        }
1031
1032        info!("scaling actors succeed.");
1033        Ok(())
1034    }
1035
1036    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
1037    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
1038    // For Custom, we'll assess the parallelism of the core fragment;
1039    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
1040    // If it's lower, we'll set it to Fixed.
1041    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
1042    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
1043    fn derive_target_parallelism(
1044        available_parallelism: usize,
1045        assigned_parallelism: TableParallelism,
1046        actual_fragment_parallelism: Option<usize>,
1047        default_parallelism: DefaultParallelism,
1048    ) -> TableParallelism {
1049        match assigned_parallelism {
1050            TableParallelism::Custom => {
1051                if let Some(fragment_parallelism) = actual_fragment_parallelism {
1052                    if fragment_parallelism >= available_parallelism {
1053                        TableParallelism::Adaptive
1054                    } else {
1055                        TableParallelism::Fixed(fragment_parallelism)
1056                    }
1057                } else {
1058                    TableParallelism::Adaptive
1059                }
1060            }
1061            TableParallelism::Adaptive => {
1062                match (default_parallelism, actual_fragment_parallelism) {
1063                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
1064                        if fragment_parallelism == n.get() =>
1065                    {
1066                        TableParallelism::Fixed(fragment_parallelism)
1067                    }
1068                    _ => TableParallelism::Adaptive,
1069                }
1070            }
1071            _ => assigned_parallelism,
1072        }
1073    }
1074
1075    /// Update all actors in compute nodes.
1076    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
1077        self.metadata_manager.all_active_actors().await
1078    }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use std::num::NonZeroUsize;
1084
1085    use super::*;
1086    #[test]
1087    fn test_derive_target_parallelism() {
1088        // total 10, assigned custom, actual 5, default full -> fixed(5)
1089        assert_eq!(
1090            TableParallelism::Fixed(5),
1091            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1092                10,
1093                TableParallelism::Custom,
1094                Some(5),
1095                DefaultParallelism::Full,
1096            )
1097        );
1098
1099        // total 10, assigned custom, actual 10, default full -> adaptive
1100        assert_eq!(
1101            TableParallelism::Adaptive,
1102            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1103                10,
1104                TableParallelism::Custom,
1105                Some(10),
1106                DefaultParallelism::Full,
1107            )
1108        );
1109
1110        // total 10, assigned custom, actual 11, default full -> adaptive
1111        assert_eq!(
1112            TableParallelism::Adaptive,
1113            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1114                10,
1115                TableParallelism::Custom,
1116                Some(11),
1117                DefaultParallelism::Full,
1118            )
1119        );
1120
1121        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
1122        assert_eq!(
1123            TableParallelism::Adaptive,
1124            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1125                10,
1126                TableParallelism::Custom,
1127                None,
1128                DefaultParallelism::Full,
1129            )
1130        );
1131
1132        // total 10, assigned adaptive, actual _, default full -> adaptive
1133        assert_eq!(
1134            TableParallelism::Adaptive,
1135            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1136                10,
1137                TableParallelism::Adaptive,
1138                None,
1139                DefaultParallelism::Full,
1140            )
1141        );
1142
1143        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
1144        assert_eq!(
1145            TableParallelism::Fixed(5),
1146            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1147                10,
1148                TableParallelism::Adaptive,
1149                Some(5),
1150                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1151            )
1152        );
1153
1154        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
1155        assert_eq!(
1156            TableParallelism::Adaptive,
1157            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1158                10,
1159                TableParallelism::Adaptive,
1160                Some(6),
1161                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1162            )
1163        );
1164    }
1165}