risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::StreamingParallelism;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tokio::time::Instant;
34use tracing::{debug, info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::barrier::context::GlobalBarrierWorkerContextImpl;
38use crate::barrier::info::InflightStreamingJobInfo;
39use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
40use crate::manager::ActiveStreamingWorkerNodes;
41use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
42use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
43use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
44use crate::stream::{
45    JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
46    RescheduleOptions, SourceChange, StreamFragmentGraph,
47};
48use crate::{MetaResult, model};
49
50impl GlobalBarrierWorkerContextImpl {
51    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
52    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
53        let database_id = database_id.map(|database_id| database_id.database_id as _);
54        self.metadata_manager
55            .catalog_controller
56            .clean_dirty_subscription(database_id)
57            .await?;
58        let dirty_associated_source_ids = self
59            .metadata_manager
60            .catalog_controller
61            .clean_dirty_creating_jobs(database_id)
62            .await?;
63        self.metadata_manager
64            .catalog_controller
65            .reset_refreshing_tables(database_id)
66            .await?;
67
68        // unregister cleaned sources.
69        self.source_manager
70            .apply_source_change(SourceChange::DropSource {
71                dropped_source_ids: dirty_associated_source_ids,
72            })
73            .await;
74
75        Ok(())
76    }
77
78    async fn purge_state_table_from_hummock(
79        &self,
80        all_state_table_ids: &HashSet<TableId>,
81    ) -> MetaResult<()> {
82        self.hummock_manager.purge(all_state_table_ids).await?;
83        Ok(())
84    }
85
86    async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
87        let mgr = &self.metadata_manager;
88        let job_info = mgr
89            .catalog_controller
90            .list_background_creating_jobs(false)
91            .await?;
92
93        try_join_all(
94            job_info
95                .into_iter()
96                .map(|(id, definition, _init_at)| async move {
97                    let table_id = TableId::new(id as _);
98                    let stream_job_fragments =
99                        mgr.catalog_controller.get_job_fragments_by_id(id).await?;
100                    assert_eq!(stream_job_fragments.stream_job_id(), table_id);
101                    Ok((definition, stream_job_fragments))
102                }),
103        )
104        .await
105        // If failed, enter recovery mode.
106    }
107
108    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
109    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
110    /// will create or drop before this barrier flow through them.
111    async fn resolve_graph_info(
112        &self,
113        database_id: Option<DatabaseId>,
114    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
115        let database_id = database_id.map(|database_id| database_id.database_id as _);
116        let all_actor_infos = self
117            .metadata_manager
118            .catalog_controller
119            .load_all_actors(database_id)
120            .await?;
121
122        Ok(all_actor_infos
123            .into_iter()
124            .map(|(loaded_database_id, job_fragment_infos)| {
125                if let Some(database_id) = database_id {
126                    assert_eq!(database_id, loaded_database_id);
127                }
128                (
129                    DatabaseId::new(loaded_database_id as _),
130                    job_fragment_infos
131                        .into_iter()
132                        .map(|(job_id, fragment_infos)| {
133                            let job_id = TableId::new(job_id as _);
134                            (
135                                job_id,
136                                InflightStreamingJobInfo {
137                                    job_id,
138                                    fragment_infos: fragment_infos
139                                        .into_iter()
140                                        .map(|(fragment_id, info)| (fragment_id as _, info))
141                                        .collect(),
142                                },
143                            )
144                        })
145                        .collect(),
146                )
147            })
148            .collect())
149    }
150
151    #[expect(clippy::type_complexity)]
152    fn resolve_hummock_version_epochs(
153        background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
154        version: &HummockVersion,
155    ) -> MetaResult<(
156        HashMap<TableId, u64>,
157        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
158    )> {
159        let table_committed_epoch: HashMap<_, _> = version
160            .state_table_info
161            .info()
162            .iter()
163            .map(|(table_id, info)| (*table_id, info.committed_epoch))
164            .collect();
165        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
166            Ok(*table_committed_epoch
167                .get(&table_id)
168                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
169        };
170        let mut min_downstream_committed_epochs = HashMap::new();
171        for (_, job) in background_jobs.values() {
172            let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
173                // Question: should we get the committed epoch from any state tables in the job?
174                warn!(
175                    "background job {} has no committed epoch, skip resolving epochs",
176                    job.stream_job_id
177                );
178                continue;
179            };
180            if let (Some(snapshot_backfill_info), _) =
181                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
182                    job.fragments()
183                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
184                )?
185            {
186                for (upstream_table, snapshot_epoch) in
187                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188                {
189                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
190                        anyhow!(
191                            "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
192                            job
193                        )
194                    })?;
195                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
196                    match min_downstream_committed_epochs.entry(upstream_table) {
197                        Entry::Occupied(entry) => {
198                            let prev_min_epoch = entry.into_mut();
199                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
200                        }
201                        Entry::Vacant(entry) => {
202                            entry.insert(pinned_epoch);
203                        }
204                    }
205                }
206            }
207        }
208        let mut log_epochs = HashMap::new();
209        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
210            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
211            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
212                Ordering::Less => {
213                    return Err(anyhow!(
214                        "downstream epoch {} later than upstream epoch {} of table {}",
215                        downstream_committed_epoch,
216                        upstream_committed_epoch,
217                        upstream_table_id
218                    )
219                    .into());
220                }
221                Ordering::Equal => {
222                    continue;
223                }
224                Ordering::Greater => {
225                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
226                    {
227                        let epochs = table_change_log
228                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
229                            .map(|epoch_log| {
230                                (
231                                    epoch_log.non_checkpoint_epochs.clone(),
232                                    epoch_log.checkpoint_epoch,
233                                )
234                            })
235                            .collect_vec();
236                        let first_epochs = epochs.first();
237                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
238                            && *first_checkpoint_epoch == downstream_committed_epoch
239                        {
240                        } else {
241                            return Err(anyhow!(
242                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
243                                epochs, upstream_table_id, downstream_committed_epoch).into()
244                            );
245                        }
246                        log_epochs
247                            .try_insert(upstream_table_id, epochs)
248                            .expect("non-duplicated");
249                    } else {
250                        return Err(anyhow!(
251                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
252                            upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
253                        );
254                    }
255                }
256            }
257        }
258        Ok((table_committed_epoch, log_epochs))
259    }
260
261    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
262    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
263    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
264    /// the operator.
265    async fn recovery_table_with_upstream_sinks(
266        &self,
267        inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
268    ) -> MetaResult<()> {
269        let mut jobs = inflight_jobs.values_mut().try_fold(
270            HashMap::new(),
271            |mut acc, table_map| -> MetaResult<_> {
272                for (tid, job) in table_map {
273                    if acc.insert(tid.table_id, job).is_some() {
274                        return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
275                    }
276                }
277                Ok(acc)
278            },
279        )?;
280        let job_ids = jobs.keys().cloned().collect_vec();
281        // Only `Table` will be returned here, ignoring other catalog objects.
282        let tables = self
283            .metadata_manager
284            .catalog_controller
285            .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
286            .await?;
287        for table in tables {
288            assert_eq!(table.table_type(), PbTableType::Table);
289            let fragments = jobs.get_mut(&table.id).unwrap();
290            let mut target_fragment_id = None;
291            for fragment in fragments.fragment_infos.values() {
292                let mut is_target_fragment = false;
293                visit_stream_node_cont(&fragment.nodes, |node| {
294                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
295                        is_target_fragment = true;
296                        false
297                    } else {
298                        true
299                    }
300                });
301                if is_target_fragment {
302                    target_fragment_id = Some(fragment.fragment_id);
303                    break;
304                }
305            }
306            let Some(target_fragment_id) = target_fragment_id else {
307                tracing::debug!(
308                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
309                    table.id
310                );
311                continue;
312            };
313            let target_fragment = fragments
314                .fragment_infos
315                .get_mut(&target_fragment_id)
316                .unwrap();
317            let upstream_infos = self
318                .metadata_manager
319                .catalog_controller
320                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
321                .await?;
322            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
323        }
324
325        Ok(())
326    }
327
328    pub(super) async fn reload_runtime_info_impl(
329        &self,
330    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
331        {
332            {
333                {
334                    self.clean_dirty_streaming_jobs(None)
335                        .await
336                        .context("clean dirty streaming jobs")?;
337
338                    // Background job progress needs to be recovered.
339                    tracing::info!("recovering background job progress");
340                    let background_jobs = {
341                        let jobs = self
342                            .list_background_job_progress()
343                            .await
344                            .context("recover background job progress should not fail")?;
345                        let mut background_jobs = HashMap::new();
346                        for (definition, stream_job_fragments) in jobs {
347                            if stream_job_fragments
348                                .tracking_progress_actor_ids()
349                                .is_empty()
350                            {
351                                // If there's no tracking actor in the job, we can finish the job directly.
352                                self.metadata_manager
353                                    .catalog_controller
354                                    .finish_streaming_job(
355                                        stream_job_fragments.stream_job_id().table_id as _,
356                                    )
357                                    .await?;
358                            } else {
359                                background_jobs
360                                    .try_insert(
361                                        stream_job_fragments.stream_job_id(),
362                                        (definition, stream_job_fragments),
363                                    )
364                                    .expect("non-duplicate");
365                            }
366                        }
367                        background_jobs
368                    };
369
370                    tracing::info!("recovered background job progress");
371
372                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
373                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
374                    self.metadata_manager
375                        .catalog_controller
376                        .cleanup_dropped_tables()
377                        .await;
378
379                    let mut active_streaming_nodes =
380                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
381                            .await?;
382
383                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
384                    info!(
385                        "background streaming jobs: {:?} total {}",
386                        background_streaming_jobs,
387                        background_streaming_jobs.len()
388                    );
389
390                    let unreschedulable_jobs = {
391                        let mut unreschedulable_jobs = HashSet::new();
392
393                        for job_id in background_streaming_jobs {
394                            let scan_types = self
395                                .metadata_manager
396                                .get_job_backfill_scan_types(&job_id)
397                                .await?;
398
399                            if scan_types
400                                .values()
401                                .any(|scan_type| !scan_type.is_reschedulable())
402                            {
403                                unreschedulable_jobs.insert(job_id);
404                            }
405                        }
406
407                        unreschedulable_jobs
408                    };
409
410                    if !unreschedulable_jobs.is_empty() {
411                        tracing::info!(
412                            "unreschedulable background jobs: {:?}",
413                            unreschedulable_jobs
414                        );
415                    }
416
417                    // Resolve actor info for recovery. If there's no actor to recover, most of the
418                    // following steps will be no-op, while the compute nodes will still be reset.
419                    // FIXME: Transactions should be used.
420                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
421                    let mut info = if !self.env.opts.disable_automatic_parallelism_control
422                        && unreschedulable_jobs.is_empty()
423                    {
424                        info!("trigger offline scaling");
425                        self.scale_actors(&active_streaming_nodes)
426                            .await
427                            .inspect_err(|err| {
428                                warn!(error = %err.as_report(), "scale actors failed");
429                            })?;
430
431                        self.resolve_graph_info(None).await.inspect_err(|err| {
432                            warn!(error = %err.as_report(), "resolve actor info failed");
433                        })?
434                    } else {
435                        info!("trigger actor migration");
436                        // Migrate actors in expired CN to newly joined one.
437                        self.migrate_actors(&mut active_streaming_nodes)
438                            .await
439                            .inspect_err(|err| {
440                                warn!(error = %err.as_report(), "migrate actors failed");
441                            })?
442                    };
443
444                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
445                    if !dropped_table_ids.is_empty() {
446                        self.metadata_manager
447                            .catalog_controller
448                            .complete_dropped_tables(
449                                dropped_table_ids.into_iter().map(|id| id.table_id as _),
450                            )
451                            .await;
452                        info = self.resolve_graph_info(None).await.inspect_err(|err| {
453                            warn!(error = %err.as_report(), "resolve actor info failed");
454                        })?
455                    }
456
457                    self.recovery_table_with_upstream_sinks(&mut info).await?;
458
459                    let info = info;
460
461                    self.purge_state_table_from_hummock(
462                        &info
463                            .values()
464                            .flatten()
465                            .flat_map(|(_, job)| job.existing_table_ids())
466                            .collect(),
467                    )
468                    .await
469                    .context("purge state table from hummock")?;
470
471                    let (state_table_committed_epochs, state_table_log_epochs) = self
472                        .hummock_manager
473                        .on_current_version(|version| {
474                            Self::resolve_hummock_version_epochs(&background_jobs, version)
475                        })
476                        .await?;
477
478                    let subscription_infos = self
479                        .metadata_manager
480                        .get_mv_depended_subscriptions(None)
481                        .await?
482                        .into_iter()
483                        .map(|(database_id, mv_depended_subscriptions)| {
484                            (
485                                database_id,
486                                InflightSubscriptionInfo {
487                                    mv_depended_subscriptions,
488                                },
489                            )
490                        })
491                        .collect();
492
493                    // update and build all actors.
494                    let stream_actors = self.load_all_actors().await.inspect_err(|err| {
495                        warn!(error = %err.as_report(), "update actors failed");
496                    })?;
497
498                    let fragment_relations = self
499                        .metadata_manager
500                        .catalog_controller
501                        .get_fragment_downstream_relations(
502                            info.values()
503                                .flatten()
504                                .flat_map(|(_, job)| job.fragment_infos())
505                                .map(|fragment| fragment.fragment_id as _)
506                                .collect(),
507                        )
508                        .await?;
509
510                    let background_jobs = {
511                        let jobs = self
512                            .list_background_job_progress()
513                            .await
514                            .context("recover background job progress should not fail")?;
515                        let mut background_jobs = HashMap::new();
516                        for (definition, stream_job_fragments) in jobs {
517                            background_jobs
518                                .try_insert(
519                                    stream_job_fragments.stream_job_id(),
520                                    (definition, stream_job_fragments),
521                                )
522                                .expect("non-duplicate");
523                        }
524                        background_jobs
525                    };
526
527                    let database_infos = self
528                        .metadata_manager
529                        .catalog_controller
530                        .list_databases()
531                        .await?;
532
533                    // get split assignments for all actors
534                    let mut source_splits = HashMap::new();
535                    for (_, job) in info.values().flatten() {
536                        for fragment in job.fragment_infos.values() {
537                            for (actor_id, info) in &fragment.actors {
538                                source_splits.insert(*actor_id, info.splits.clone());
539                            }
540                        }
541                    }
542
543                    let cdc_table_backfill_actors = self
544                        .metadata_manager
545                        .catalog_controller
546                        .cdc_table_backfill_actor_ids()
547                        .await?;
548                    let cdc_table_ids = cdc_table_backfill_actors
549                        .keys()
550                        .cloned()
551                        .collect::<Vec<_>>();
552                    let cdc_table_snapshot_split_assignment =
553                        assign_cdc_table_snapshot_splits_pairs(
554                            cdc_table_backfill_actors,
555                            self.env.meta_store_ref(),
556                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
557                        )
558                        .await?;
559                    let cdc_table_snapshot_split_assignment =
560                        if cdc_table_snapshot_split_assignment.is_empty() {
561                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
562                        } else {
563                            let generation = self
564                                .env
565                                .cdc_table_backfill_tracker
566                                .next_generation(cdc_table_ids.into_iter());
567                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
568                                cdc_table_snapshot_split_assignment,
569                                generation,
570                            )
571                        };
572                    Ok(BarrierWorkerRuntimeInfoSnapshot {
573                        active_streaming_nodes,
574                        database_job_infos: info,
575                        state_table_committed_epochs,
576                        state_table_log_epochs,
577                        subscription_infos,
578                        stream_actors,
579                        fragment_relations,
580                        source_splits,
581                        background_jobs,
582                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
583                        database_infos,
584                        cdc_table_snapshot_split_assignment,
585                    })
586                }
587            }
588        }
589    }
590
591    pub(super) async fn reload_database_runtime_info_impl(
592        &self,
593        database_id: DatabaseId,
594    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
595        self.clean_dirty_streaming_jobs(Some(database_id))
596            .await
597            .context("clean dirty streaming jobs")?;
598
599        // Background job progress needs to be recovered.
600        tracing::info!(
601            ?database_id,
602            "recovering background job progress of database"
603        );
604        let background_jobs = self
605            .list_background_job_progress()
606            .await
607            .context("recover background job progress of database should not fail")?;
608        tracing::info!(?database_id, "recovered background job progress");
609
610        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
611        let dropped_table_ids = self
612            .scheduled_barriers
613            .pre_apply_drop_cancel(Some(database_id));
614        self.metadata_manager
615            .catalog_controller
616            .complete_dropped_tables(dropped_table_ids.into_iter().map(|id| id.table_id as _))
617            .await;
618
619        let mut info = self
620            .resolve_graph_info(Some(database_id))
621            .await
622            .inspect_err(|err| {
623                warn!(error = %err.as_report(), "resolve actor info failed");
624            })?;
625
626        self.recovery_table_with_upstream_sinks(&mut info).await?;
627
628        assert!(info.len() <= 1);
629        let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
630            assert_eq!(loaded_database_id, database_id);
631            info
632        }) else {
633            return Ok(None);
634        };
635
636        let background_jobs = {
637            let jobs = background_jobs;
638            let mut background_jobs = HashMap::new();
639            for (definition, stream_job_fragments) in jobs {
640                if !info.contains_key(&stream_job_fragments.stream_job_id()) {
641                    continue;
642                }
643                if stream_job_fragments
644                    .tracking_progress_actor_ids()
645                    .is_empty()
646                {
647                    // If there's no tracking actor in the job, we can finish the job directly.
648                    self.metadata_manager
649                        .catalog_controller
650                        .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
651                        .await?;
652                } else {
653                    background_jobs
654                        .try_insert(
655                            stream_job_fragments.stream_job_id(),
656                            (definition, stream_job_fragments),
657                        )
658                        .expect("non-duplicate");
659                }
660            }
661            background_jobs
662        };
663
664        let (state_table_committed_epochs, state_table_log_epochs) = self
665            .hummock_manager
666            .on_current_version(|version| {
667                Self::resolve_hummock_version_epochs(&background_jobs, version)
668            })
669            .await?;
670
671        let subscription_infos = self
672            .metadata_manager
673            .get_mv_depended_subscriptions(Some(database_id))
674            .await?;
675        assert!(subscription_infos.len() <= 1);
676        let mv_depended_subscriptions = subscription_infos
677            .into_iter()
678            .next()
679            .map(|(loaded_database_id, subscriptions)| {
680                assert_eq!(loaded_database_id, database_id);
681                subscriptions
682            })
683            .unwrap_or_default();
684        let subscription_info = InflightSubscriptionInfo {
685            mv_depended_subscriptions,
686        };
687
688        let fragment_relations = self
689            .metadata_manager
690            .catalog_controller
691            .get_fragment_downstream_relations(
692                info.values()
693                    .flatten()
694                    .map(|fragment| fragment.fragment_id as _)
695                    .collect(),
696            )
697            .await?;
698
699        // update and build all actors.
700        let stream_actors = self.load_all_actors().await.inspect_err(|err| {
701            warn!(error = %err.as_report(), "update actors failed");
702        })?;
703
704        // get split assignments for all actors
705        let mut source_splits = HashMap::new();
706        for fragment in info.values().flatten() {
707            for (actor_id, info) in &fragment.actors {
708                source_splits.insert(*actor_id, info.splits.clone());
709            }
710        }
711
712        let cdc_table_backfill_actors = self
713            .metadata_manager
714            .catalog_controller
715            .cdc_table_backfill_actor_ids()
716            .await?;
717        let cdc_table_ids = cdc_table_backfill_actors
718            .keys()
719            .cloned()
720            .collect::<Vec<_>>();
721        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
722            cdc_table_backfill_actors,
723            self.env.meta_store_ref(),
724            self.env.cdc_table_backfill_tracker.completed_job_ids(),
725        )
726        .await?;
727        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
728        {
729            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
730        } else {
731            CdcTableSnapshotSplitAssignmentWithGeneration::new(
732                cdc_table_snapshot_split_assignment,
733                self.env
734                    .cdc_table_backfill_tracker
735                    .next_generation(cdc_table_ids.into_iter()),
736            )
737        };
738        Ok(Some(DatabaseRuntimeInfoSnapshot {
739            job_infos: info,
740            state_table_committed_epochs,
741            state_table_log_epochs,
742            subscription_info,
743            stream_actors,
744            fragment_relations,
745            source_splits,
746            background_jobs,
747            cdc_table_snapshot_split_assignment,
748        }))
749    }
750}
751
752impl GlobalBarrierWorkerContextImpl {
753    // Migration timeout.
754    const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
755
756    /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
757    async fn migrate_actors(
758        &self,
759        active_nodes: &mut ActiveStreamingWorkerNodes,
760    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
761        let mgr = &self.metadata_manager;
762
763        // all worker slots used by actors
764        let all_inuse_worker_slots: HashSet<_> = mgr
765            .catalog_controller
766            .all_inuse_worker_slots()
767            .await?
768            .into_iter()
769            .collect();
770
771        let active_worker_slots: HashSet<_> = active_nodes
772            .current()
773            .values()
774            .flat_map(|node| {
775                (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
776            })
777            .collect();
778
779        let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
780            .difference(&active_worker_slots)
781            .cloned()
782            .collect();
783
784        if expired_worker_slots.is_empty() {
785            info!("no expired worker slots, skipping.");
786            return self.resolve_graph_info(None).await;
787        }
788
789        info!("start migrate actors.");
790        let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
791        info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
792
793        let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
794            .intersection(&active_worker_slots)
795            .cloned()
796            .collect();
797
798        let start = Instant::now();
799        let mut plan = HashMap::new();
800        'discovery: while !to_migrate_worker_slots.is_empty() {
801            let mut new_worker_slots = active_nodes
802                .current()
803                .values()
804                .flat_map(|worker| {
805                    (0..worker.compute_node_parallelism())
806                        .map(move |i| WorkerSlotId::new(worker.id, i as _))
807                })
808                .collect_vec();
809
810            new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
811            let to_migration_size = to_migrate_worker_slots.len();
812            let mut available_size = new_worker_slots.len();
813
814            if available_size < to_migration_size
815                && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
816            {
817                let mut factor = 2;
818
819                while available_size < to_migration_size {
820                    let mut extended_worker_slots = active_nodes
821                        .current()
822                        .values()
823                        .flat_map(|worker| {
824                            (0..worker.compute_node_parallelism() * factor)
825                                .map(move |i| WorkerSlotId::new(worker.id, i as _))
826                        })
827                        .collect_vec();
828
829                    extended_worker_slots
830                        .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
831
832                    extended_worker_slots.sort_by(|a, b| {
833                        a.slot_idx()
834                            .cmp(&b.slot_idx())
835                            .then(a.worker_id().cmp(&b.worker_id()))
836                    });
837
838                    available_size = extended_worker_slots.len();
839                    new_worker_slots = extended_worker_slots;
840
841                    factor *= 2;
842                }
843
844                tracing::info!(
845                    "migration timed out, extending worker slots to {:?} by factor {}",
846                    new_worker_slots,
847                    factor,
848                );
849            }
850
851            if !new_worker_slots.is_empty() {
852                debug!("new worker slots found: {:#?}", new_worker_slots);
853                for target_worker_slot in new_worker_slots {
854                    if let Some(from) = to_migrate_worker_slots.pop() {
855                        debug!(
856                            "plan to migrate from worker slot {} to {}",
857                            from, target_worker_slot
858                        );
859                        inuse_worker_slots.insert(target_worker_slot);
860                        plan.insert(from, target_worker_slot);
861                    } else {
862                        break 'discovery;
863                    }
864                }
865            }
866
867            if to_migrate_worker_slots.is_empty() {
868                break;
869            }
870
871            // wait to get newly joined CN
872            let changed = active_nodes
873                .wait_changed(
874                    Duration::from_millis(5000),
875                    Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
876                    |active_nodes| {
877                        let current_nodes = active_nodes
878                            .current()
879                            .values()
880                            .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
881                            .collect_vec();
882                        warn!(
883                            current_nodes = ?current_nodes,
884                            "waiting for new workers to join, elapsed: {}s",
885                            start.elapsed().as_secs()
886                        );
887                    },
888                )
889                .await;
890            warn!(?changed, "get worker changed or timed out. Retry migrate");
891        }
892
893        info!("migration plan {:?}", plan);
894
895        mgr.catalog_controller.migrate_actors(plan).await?;
896
897        info!("migrate actors succeed.");
898
899        self.resolve_graph_info(None).await
900    }
901
902    async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
903        let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
904            return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
905        };
906
907        match self.scale_controller.integrity_check().await {
908            Ok(_) => {
909                info!("integrity check passed");
910            }
911            Err(e) => {
912                return Err(anyhow!(e).context("integrity check failed").into());
913            }
914        }
915
916        let mgr = &self.metadata_manager;
917
918        debug!("start resetting actors distribution");
919
920        let available_workers: HashMap<_, _> = active_nodes
921            .current()
922            .values()
923            .filter(|worker| worker.is_streaming_schedulable())
924            .map(|worker| (worker.id, worker.clone()))
925            .collect();
926
927        info!(
928            "target worker ids for offline scaling: {:?}",
929            available_workers
930        );
931
932        let available_parallelism = active_nodes
933            .current()
934            .values()
935            .map(|worker_node| worker_node.compute_node_parallelism())
936            .sum();
937
938        let mut table_parallelisms = HashMap::new();
939
940        let reschedule_targets: HashMap<_, _> = {
941            let streaming_parallelisms = mgr
942                .catalog_controller
943                .get_all_streaming_parallelisms()
944                .await?;
945
946            let mut result = HashMap::new();
947
948            for (object_id, streaming_parallelism) in streaming_parallelisms {
949                let actual_fragment_parallelism = mgr
950                    .catalog_controller
951                    .get_actual_job_fragment_parallelism(object_id)
952                    .await?;
953
954                let table_parallelism = match streaming_parallelism {
955                    StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
956                    StreamingParallelism::Custom => model::TableParallelism::Custom,
957                    StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
958                };
959
960                let target_parallelism = Self::derive_target_parallelism(
961                    available_parallelism,
962                    table_parallelism,
963                    actual_fragment_parallelism,
964                    self.env.opts.default_parallelism,
965                );
966
967                if target_parallelism != table_parallelism {
968                    tracing::info!(
969                        "resetting table {} parallelism from {:?} to {:?}",
970                        object_id,
971                        table_parallelism,
972                        target_parallelism
973                    );
974                }
975
976                table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
977
978                let parallelism_change = JobParallelismTarget::Update(target_parallelism);
979
980                result.insert(
981                    object_id as u32,
982                    JobRescheduleTarget {
983                        parallelism: parallelism_change,
984                        resource_group: JobResourceGroupTarget::Keep,
985                    },
986                );
987            }
988
989            result
990        };
991
992        info!(
993            "target table parallelisms for offline scaling: {:?}",
994            reschedule_targets
995        );
996
997        let reschedule_targets = reschedule_targets.into_iter().collect_vec();
998
999        for chunk in reschedule_targets
1000            .chunks(self.env.opts.parallelism_control_batch_size.max(1))
1001            .map(|c| c.to_vec())
1002        {
1003            let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
1004
1005            let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
1006
1007            info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
1008
1009            let plan = self
1010                .scale_controller
1011                .generate_job_reschedule_plan(
1012                    JobReschedulePolicy {
1013                        targets: local_reschedule_targets,
1014                    },
1015                    false,
1016                )
1017                .await?;
1018
1019            // no need to update
1020            if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
1021                info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
1022                continue;
1023            };
1024
1025            let mut compared_table_parallelisms = table_parallelisms.clone();
1026
1027            // skip reschedule if no reschedule is generated.
1028            let reschedule_fragment = if plan.reschedules.is_empty() {
1029                HashMap::new()
1030            } else {
1031                self.scale_controller
1032                    .analyze_reschedule_plan(
1033                        plan.reschedules,
1034                        RescheduleOptions {
1035                            resolve_no_shuffle_upstream: true,
1036                            skip_create_new_actors: true,
1037                        },
1038                        &mut compared_table_parallelisms,
1039                    )
1040                    .await?
1041            };
1042
1043            // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
1044            debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
1045
1046            info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
1047
1048            if let Err(e) = self
1049                .scale_controller
1050                .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
1051                .await
1052            {
1053                tracing::error!(
1054                    error = %e.as_report(),
1055                    "failed to apply reschedule for offline scaling in recovery",
1056                );
1057
1058                return Err(e);
1059            }
1060
1061            info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
1062        }
1063
1064        info!("scaling actors succeed.");
1065        Ok(())
1066    }
1067
1068    // We infer the new parallelism strategy based on the prior level of parallelism of the table.
1069    // If the parallelism strategy is Fixed or Auto, we won't make any modifications.
1070    // For Custom, we'll assess the parallelism of the core fragment;
1071    // if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
1072    // If it's lower, we'll set it to Fixed.
1073    // If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
1074    // and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
1075    fn derive_target_parallelism(
1076        available_parallelism: usize,
1077        assigned_parallelism: TableParallelism,
1078        actual_fragment_parallelism: Option<usize>,
1079        default_parallelism: DefaultParallelism,
1080    ) -> TableParallelism {
1081        match assigned_parallelism {
1082            TableParallelism::Custom => {
1083                if let Some(fragment_parallelism) = actual_fragment_parallelism {
1084                    if fragment_parallelism >= available_parallelism {
1085                        TableParallelism::Adaptive
1086                    } else {
1087                        TableParallelism::Fixed(fragment_parallelism)
1088                    }
1089                } else {
1090                    TableParallelism::Adaptive
1091                }
1092            }
1093            TableParallelism::Adaptive => {
1094                match (default_parallelism, actual_fragment_parallelism) {
1095                    (DefaultParallelism::Default(n), Some(fragment_parallelism))
1096                        if fragment_parallelism == n.get() =>
1097                    {
1098                        TableParallelism::Fixed(fragment_parallelism)
1099                    }
1100                    _ => TableParallelism::Adaptive,
1101                }
1102            }
1103            _ => assigned_parallelism,
1104        }
1105    }
1106
1107    /// Update all actors in compute nodes.
1108    async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
1109        self.metadata_manager.all_active_actors().await
1110    }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115    use std::num::NonZeroUsize;
1116
1117    use super::*;
1118    #[test]
1119    fn test_derive_target_parallelism() {
1120        // total 10, assigned custom, actual 5, default full -> fixed(5)
1121        assert_eq!(
1122            TableParallelism::Fixed(5),
1123            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1124                10,
1125                TableParallelism::Custom,
1126                Some(5),
1127                DefaultParallelism::Full,
1128            )
1129        );
1130
1131        // total 10, assigned custom, actual 10, default full -> adaptive
1132        assert_eq!(
1133            TableParallelism::Adaptive,
1134            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1135                10,
1136                TableParallelism::Custom,
1137                Some(10),
1138                DefaultParallelism::Full,
1139            )
1140        );
1141
1142        // total 10, assigned custom, actual 11, default full -> adaptive
1143        assert_eq!(
1144            TableParallelism::Adaptive,
1145            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1146                10,
1147                TableParallelism::Custom,
1148                Some(11),
1149                DefaultParallelism::Full,
1150            )
1151        );
1152
1153        // total 10, assigned fixed(5), actual _, default full -> fixed(5)
1154        assert_eq!(
1155            TableParallelism::Adaptive,
1156            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1157                10,
1158                TableParallelism::Custom,
1159                None,
1160                DefaultParallelism::Full,
1161            )
1162        );
1163
1164        // total 10, assigned adaptive, actual _, default full -> adaptive
1165        assert_eq!(
1166            TableParallelism::Adaptive,
1167            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1168                10,
1169                TableParallelism::Adaptive,
1170                None,
1171                DefaultParallelism::Full,
1172            )
1173        );
1174
1175        // total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
1176        assert_eq!(
1177            TableParallelism::Fixed(5),
1178            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1179                10,
1180                TableParallelism::Adaptive,
1181                Some(5),
1182                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1183            )
1184        );
1185
1186        // total 10, assigned adaptive, actual 6, default 5 -> adaptive
1187        assert_eq!(
1188            TableParallelism::Adaptive,
1189            GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1190                10,
1191                TableParallelism::Adaptive,
1192                Some(6),
1193                DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1194            )
1195        );
1196    }
1197}