risingwave_meta/stream/
scale.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::future;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt};
25use risingwave_connector::source::{SplitId, SplitMetaData};
26use risingwave_meta_model::{
27    StreamingParallelism, WorkerId, fragment, fragment_relation, object, streaming_job,
28};
29use risingwave_pb::common::{WorkerNode, WorkerType};
30use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher};
31use sea_orm::{ConnectionTrait, JoinType, QuerySelect, RelationTrait};
32use thiserror_ext::AsReport;
33use tokio::sync::oneshot::Receiver;
34use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
35use tokio::task::JoinHandle;
36use tokio::time::{Instant, MissedTickBehavior};
37
38use crate::barrier::{Command, Reschedule, RescheduleContext, ReschedulePlan};
39use crate::controller::scale::{
40    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble,
41    find_fragment_no_shuffle_dags_detailed, load_fragment_context, load_fragment_context_for_jobs,
42};
43use crate::error::bail_invalid_parameter;
44use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager};
45use crate::model::{ActorId, FragmentId, StreamActor, StreamActorWithDispatchers};
46use crate::stream::{GlobalStreamManager, SourceManagerRef};
47use crate::{MetaError, MetaResult};
48
49#[derive(Debug, Clone, Eq, PartialEq)]
50pub struct WorkerReschedule {
51    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
52}
53
54use risingwave_common::id::JobId;
55use risingwave_meta_model::DispatcherType;
56use risingwave_meta_model::fragment::DistributionType;
57use risingwave_meta_model::prelude::{Fragment, FragmentRelation, StreamingJob};
58use sea_orm::ActiveValue::Set;
59use sea_orm::{
60    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
61};
62
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::utils::{
65    StreamingJobExtraInfo, compose_dispatchers, get_streaming_job_extra_info,
66};
67
68pub type ScaleControllerRef = Arc<ScaleController>;
69
70// Field order is intentional: derived `Ord` is used to canonicalize actor
71// layout for no-op detection, so new fields must preserve meaningful
72// comparison order.
73#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
74struct NormalizedActor {
75    worker_id: WorkerId,
76    vnode_bitmap: Option<Vec<usize>>,
77    splits: Vec<SplitId>,
78}
79
80#[derive(Clone, Debug, PartialEq, Eq)]
81struct NormalizedFragmentLayout {
82    distribution_type: DistributionType,
83    actors: Vec<NormalizedActor>,
84}
85
86fn normalize_actor_info(info: &InflightActorInfo) -> NormalizedActor {
87    let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
88        bitmap
89            .iter_vnodes()
90            .map(|vnode| vnode.to_index())
91            .collect_vec()
92    });
93    let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
94    splits.sort_unstable();
95    NormalizedActor {
96        worker_id: info.worker_id,
97        vnode_bitmap,
98        splits,
99    }
100}
101
102fn build_normalized_fragment_layout(
103    fragment_info: &InflightFragmentInfo,
104) -> NormalizedFragmentLayout {
105    // Actor ids are intentionally erased here. For no-op detection we only care
106    // about the semantic layout carried by worker placement, vnode ownership,
107    // and split assignment.
108    let mut actors = fragment_info
109        .actors
110        .values()
111        .map(normalize_actor_info)
112        .collect_vec();
113    actors.sort_unstable();
114
115    NormalizedFragmentLayout {
116        distribution_type: fragment_info.distribution_type,
117        actors,
118    }
119}
120
121/// Compare the rendered (preview) layout against the current in-flight layout
122/// to decide whether a reschedule is a no-op.
123///
124/// Dispatcher layout is intentionally not compared here. On the reschedule path
125/// it is a deterministic function of the fragment actor layout together with the
126/// stable fragment-relation metadata held in `RescheduleContext`, so fragment-level
127/// equality already implies dispatcher equality.
128///
129/// Only fragments present in `render_result` are compared. This is intentional:
130/// this function sits on the reschedule path, which only re-renders fragments
131/// that were loaded into the `RescheduleContext`. Fragment creation or deletion
132/// is handled by separate DDL / recovery paths, not by reschedule, so fragments
133/// outside the render set are irrelevant here. In other words, this is a subset
134/// check over the rendered fragments, not a full bidirectional equality check.
135pub(crate) fn rendered_layout_matches_current(
136    render_result: &FragmentRenderMap,
137    all_prev_fragments: &HashMap<FragmentId, &InflightFragmentInfo>,
138) -> MetaResult<bool> {
139    let all_rendered_fragments: HashMap<_, _> = render_result
140        .values()
141        .flat_map(|jobs| jobs.values())
142        .flatten()
143        .map(|(fragment_id, info)| (*fragment_id, info))
144        .collect();
145
146    for (fragment_id, rendered_fragment) in &all_rendered_fragments {
147        let Some(prev_fragment) = all_prev_fragments.get(fragment_id).copied() else {
148            return Err(MetaError::from(anyhow!(
149                "previous fragment info for {} not found",
150                fragment_id
151            )));
152        };
153
154        let rendered_layout = build_normalized_fragment_layout(rendered_fragment);
155        let current_layout = build_normalized_fragment_layout(prev_fragment);
156
157        if rendered_layout != current_layout {
158            return Ok(false);
159        }
160    }
161
162    Ok(true)
163}
164
165pub struct ScaleController {
166    pub metadata_manager: MetadataManager,
167
168    pub source_manager: SourceManagerRef,
169
170    pub env: MetaSrvEnv,
171
172    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
173    /// e.g., a MV cannot be rescheduled during foreground backfill.
174    pub reschedule_lock: RwLock<()>,
175}
176
177impl ScaleController {
178    pub fn new(
179        metadata_manager: &MetadataManager,
180        source_manager: SourceManagerRef,
181        env: MetaSrvEnv,
182    ) -> Self {
183        Self {
184            metadata_manager: metadata_manager.clone(),
185            source_manager,
186            env,
187            reschedule_lock: RwLock::new(()),
188        }
189    }
190
191    pub async fn resolve_related_no_shuffle_jobs(
192        &self,
193        jobs: &[JobId],
194    ) -> MetaResult<HashSet<JobId>> {
195        let inner = self.metadata_manager.catalog_controller.inner.read().await;
196        let txn = inner.db.begin().await?;
197
198        let fragment_ids: Vec<_> = Fragment::find()
199            .select_only()
200            .column(fragment::Column::FragmentId)
201            .filter(fragment::Column::JobId.is_in(jobs.to_vec()))
202            .into_tuple()
203            .all(&txn)
204            .await?;
205        let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
206        let related_fragments = ensembles
207            .iter()
208            .flat_map(|ensemble| ensemble.fragments())
209            .collect_vec();
210
211        let job_ids: Vec<_> = Fragment::find()
212            .select_only()
213            .column(fragment::Column::JobId)
214            .filter(fragment::Column::FragmentId.is_in(related_fragments))
215            .into_tuple()
216            .all(&txn)
217            .await?;
218
219        let job_ids = job_ids.into_iter().collect();
220
221        Ok(job_ids)
222    }
223
224    pub async fn reschedule_inplace(
225        &self,
226        policy: HashMap<JobId, ReschedulePolicy>,
227    ) -> MetaResult<HashMap<DatabaseId, Command>> {
228        let inner = self.metadata_manager.catalog_controller.inner.write().await;
229        let txn = inner.db.begin().await?;
230
231        for (table_id, target) in &policy {
232            let streaming_job = StreamingJob::find_by_id(*table_id)
233                .one(&txn)
234                .await?
235                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
236
237            let max_parallelism = streaming_job.max_parallelism;
238
239            let mut streaming_job = streaming_job.into_active_model();
240
241            match &target {
242                ReschedulePolicy::Parallelism(p) | ReschedulePolicy::Both(p, _) => {
243                    if let StreamingParallelism::Fixed(n) = p.parallelism
244                        && n > max_parallelism as usize
245                    {
246                        bail!(format!(
247                            "specified parallelism {n} should not exceed max parallelism {max_parallelism}"
248                        ));
249                    }
250
251                    streaming_job.parallelism = Set(p.parallelism.clone());
252                    if matches!(p.parallelism, StreamingParallelism::Fixed(_)) {
253                        streaming_job.adaptive_parallelism_strategy = Set(None);
254                    } else {
255                        streaming_job.adaptive_parallelism_strategy =
256                            Set(p.adaptive_parallelism_strategy.clone());
257                    }
258                }
259                _ => {}
260            }
261
262            match &target {
263                ReschedulePolicy::ResourceGroup(r) | ReschedulePolicy::Both(_, r) => {
264                    streaming_job.specific_resource_group = Set(r.resource_group.clone());
265                }
266                _ => {}
267            }
268
269            StreamingJob::update(streaming_job).exec(&txn).await?;
270        }
271
272        let job_ids: HashSet<JobId> = policy.keys().copied().collect();
273        let commands = build_reschedule_intent_for_jobs(&txn, job_ids).await?;
274
275        txn.commit().await?;
276
277        Ok(commands)
278    }
279
280    pub async fn reschedule_backfill_parallelism_inplace(
281        &self,
282        policy: HashMap<JobId, Option<ParallelismPolicy>>,
283    ) -> MetaResult<HashMap<DatabaseId, Command>> {
284        if policy.is_empty() {
285            return Ok(HashMap::new());
286        }
287
288        let inner = self.metadata_manager.catalog_controller.inner.write().await;
289        let txn = inner.db.begin().await?;
290
291        for (table_id, policy) in &policy {
292            let streaming_job = StreamingJob::find_by_id(*table_id)
293                .one(&txn)
294                .await?
295                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
296
297            let max_parallelism = streaming_job.max_parallelism;
298
299            let mut streaming_job = streaming_job.into_active_model();
300
301            if let Some(ParallelismPolicy {
302                parallelism: StreamingParallelism::Fixed(n),
303                ..
304            }) = policy
305                && *n > max_parallelism as usize
306            {
307                bail!(format!(
308                    "specified backfill parallelism {n} should not exceed max parallelism {max_parallelism}"
309                ));
310            }
311
312            if let Some(policy) = policy {
313                streaming_job.backfill_parallelism = Set(Some(policy.parallelism.clone()));
314                if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) {
315                    streaming_job.backfill_adaptive_parallelism_strategy = Set(None);
316                } else {
317                    streaming_job.backfill_adaptive_parallelism_strategy =
318                        Set(policy.adaptive_parallelism_strategy.clone());
319                }
320            } else {
321                streaming_job.backfill_parallelism = Set(None);
322                streaming_job.backfill_adaptive_parallelism_strategy = Set(None);
323            }
324            streaming_job.update(&txn).await?;
325        }
326
327        let jobs = policy.keys().copied().collect();
328
329        let command = build_reschedule_intent_for_jobs(&txn, jobs).await?;
330
331        txn.commit().await?;
332
333        Ok(command)
334    }
335
336    pub async fn reschedule_fragment_inplace(
337        &self,
338        policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
339    ) -> MetaResult<HashMap<DatabaseId, Command>> {
340        if policy.is_empty() {
341            return Ok(HashMap::new());
342        }
343
344        let inner = self.metadata_manager.catalog_controller.inner.write().await;
345        let txn = inner.db.begin().await?;
346
347        let fragment_id_list = policy.keys().copied().collect_vec();
348
349        let existing_fragment_ids: HashSet<_> = Fragment::find()
350            .select_only()
351            .column(fragment::Column::FragmentId)
352            .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
353            .into_tuple::<risingwave_meta_model::FragmentId>()
354            .all(&txn)
355            .await?
356            .into_iter()
357            .collect();
358
359        if let Some(missing_fragment_id) = fragment_id_list
360            .iter()
361            .find(|fragment_id| !existing_fragment_ids.contains(*fragment_id))
362        {
363            return Err(MetaError::catalog_id_not_found(
364                "fragment",
365                *missing_fragment_id,
366            ));
367        }
368
369        let mut target_ensembles = vec![];
370
371        for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
372            let entry_fragment_ids = ensemble.entry_fragments().collect_vec();
373
374            let desired_parallelism = match entry_fragment_ids
375                .iter()
376                .filter_map(|fragment_id| policy.get(fragment_id).cloned())
377                .dedup()
378                .collect_vec()
379                .as_slice()
380            {
381                [] => {
382                    bail_invalid_parameter!(
383                        "none of the entry fragments {:?} were included in the reschedule request; \
384                         provide at least one entry fragment id",
385                        entry_fragment_ids
386                    );
387                }
388                [parallelism] => parallelism.clone(),
389                parallelisms => {
390                    bail!(
391                        "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
392                        parallelisms
393                    );
394                }
395            };
396
397            let fragments = Fragment::find()
398                .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
399                .all(&txn)
400                .await?;
401
402            debug_assert!(
403                fragments
404                    .iter()
405                    .map(|fragment| fragment.parallelism.as_ref())
406                    .all_equal(),
407                "entry fragments in the same ensemble should share the same parallelism"
408            );
409
410            let current_parallelism = fragments
411                .first()
412                .and_then(|fragment| fragment.parallelism.clone());
413
414            if current_parallelism == desired_parallelism {
415                continue;
416            }
417
418            for fragment in fragments {
419                let mut fragment = fragment.into_active_model();
420                fragment.parallelism = Set(desired_parallelism.clone());
421                Fragment::update(fragment).exec(&txn).await?;
422            }
423
424            target_ensembles.push(ensemble);
425        }
426
427        if target_ensembles.is_empty() {
428            txn.commit().await?;
429            return Ok(HashMap::new());
430        }
431
432        let target_fragment_ids: HashSet<FragmentId> = target_ensembles
433            .iter()
434            .flat_map(|ensemble| ensemble.component_fragments())
435            .collect();
436        let commands = build_reschedule_intent_for_fragments(&txn, target_fragment_ids).await?;
437
438        txn.commit().await?;
439
440        Ok(commands)
441    }
442
443    async fn rerender(&self, jobs: HashSet<JobId>) -> MetaResult<HashMap<DatabaseId, Command>> {
444        if jobs.is_empty() {
445            return Ok(HashMap::new());
446        }
447
448        let inner = self.metadata_manager.catalog_controller.inner.read().await;
449        let txn = inner.db.begin().await?;
450        let commands = build_reschedule_intent_for_jobs(&txn, jobs).await?;
451        txn.commit().await?;
452        Ok(commands)
453    }
454}
455
456async fn build_reschedule_intent_for_jobs(
457    txn: &impl ConnectionTrait,
458    job_ids: HashSet<JobId>,
459) -> MetaResult<HashMap<DatabaseId, Command>> {
460    if job_ids.is_empty() {
461        return Ok(HashMap::new());
462    }
463
464    let job_id_list = job_ids.iter().copied().collect_vec();
465    let database_jobs: Vec<(DatabaseId, JobId)> = StreamingJob::find()
466        .select_only()
467        .column(object::Column::DatabaseId)
468        .column(streaming_job::Column::JobId)
469        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
470        .filter(streaming_job::Column::JobId.is_in(job_id_list.clone()))
471        .into_tuple()
472        .all(txn)
473        .await?;
474
475    if database_jobs.len() != job_ids.len() {
476        let returned_jobs: HashSet<JobId> =
477            database_jobs.iter().map(|(_, job_id)| *job_id).collect();
478        let missing = job_ids.difference(&returned_jobs).copied().collect_vec();
479        return Err(MetaError::catalog_id_not_found(
480            "streaming job",
481            format!("{missing:?}"),
482        ));
483    }
484
485    let reschedule_context = load_reschedule_context_for_jobs(txn, job_ids).await?;
486    if reschedule_context.is_empty() {
487        return Ok(HashMap::new());
488    }
489
490    let commands = reschedule_context
491        .into_database_contexts()
492        .into_iter()
493        .map(|(database_id, context)| {
494            (
495                database_id,
496                Command::RescheduleIntent {
497                    context,
498                    reschedule_plan: None,
499                },
500            )
501        })
502        .collect();
503
504    Ok(commands)
505}
506
507async fn build_reschedule_intent_for_fragments(
508    txn: &impl ConnectionTrait,
509    fragment_ids: HashSet<FragmentId>,
510) -> MetaResult<HashMap<DatabaseId, Command>> {
511    if fragment_ids.is_empty() {
512        return Ok(HashMap::new());
513    }
514
515    let fragment_id_list = fragment_ids.iter().copied().collect_vec();
516    let fragment_databases: Vec<(FragmentId, DatabaseId)> = Fragment::find()
517        .select_only()
518        .column(fragment::Column::FragmentId)
519        .column(object::Column::DatabaseId)
520        .join(JoinType::LeftJoin, fragment::Relation::Object.def())
521        .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
522        .into_tuple()
523        .all(txn)
524        .await?;
525
526    if fragment_databases.len() != fragment_ids.len() {
527        let returned: HashSet<FragmentId> = fragment_databases
528            .iter()
529            .map(|(fragment_id, _)| *fragment_id)
530            .collect();
531        let missing = fragment_ids.difference(&returned).copied().collect_vec();
532        return Err(MetaError::catalog_id_not_found(
533            "fragment",
534            format!("{missing:?}"),
535        ));
536    }
537
538    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_id_list).await?;
539    let reschedule_context = load_reschedule_context_for_ensembles(txn, ensembles).await?;
540    if reschedule_context.is_empty() {
541        return Ok(HashMap::new());
542    }
543
544    let commands = reschedule_context
545        .into_database_contexts()
546        .into_iter()
547        .map(|(database_id, context)| {
548            (
549                database_id,
550                Command::RescheduleIntent {
551                    context,
552                    reschedule_plan: None,
553                },
554            )
555        })
556        .collect();
557
558    Ok(commands)
559}
560
561async fn load_reschedule_context_for_jobs(
562    txn: &impl ConnectionTrait,
563    job_ids: HashSet<JobId>,
564) -> MetaResult<RescheduleContext> {
565    let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
566    build_reschedule_context_from_loaded(txn, loaded).await
567}
568
569async fn load_reschedule_context_for_ensembles(
570    txn: &impl ConnectionTrait,
571    ensembles: Vec<NoShuffleEnsemble>,
572) -> MetaResult<RescheduleContext> {
573    let loaded = load_fragment_context(txn, ensembles).await?;
574    build_reschedule_context_from_loaded(txn, loaded).await
575}
576
577async fn build_reschedule_context_from_loaded(
578    txn: &impl ConnectionTrait,
579    loaded: LoadedFragmentContext,
580) -> MetaResult<RescheduleContext> {
581    if loaded.is_empty() {
582        return Ok(RescheduleContext::empty());
583    }
584
585    let job_ids = loaded.job_map.keys().copied().collect_vec();
586    let job_extra_info = get_streaming_job_extra_info(txn, job_ids).await?;
587
588    let fragment_ids = loaded
589        .job_fragments
590        .values()
591        .flat_map(|fragments| fragments.keys().copied())
592        .collect_vec();
593
594    let upstreams: Vec<(FragmentId, FragmentId, DispatcherType)> = FragmentRelation::find()
595        .select_only()
596        .columns([
597            fragment_relation::Column::TargetFragmentId,
598            fragment_relation::Column::SourceFragmentId,
599            fragment_relation::Column::DispatcherType,
600        ])
601        .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
602        .into_tuple()
603        .all(txn)
604        .await?;
605
606    let mut upstream_fragments = HashMap::new();
607    for (fragment, upstream, dispatcher) in upstreams {
608        upstream_fragments
609            .entry(fragment as FragmentId)
610            .or_insert(HashMap::new())
611            .insert(upstream as FragmentId, dispatcher);
612    }
613
614    let downstreams = FragmentRelation::find()
615        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids.clone()))
616        .all(txn)
617        .await?;
618
619    let mut downstream_fragments = HashMap::new();
620    let mut downstream_relations = HashMap::new();
621    for relation in downstreams {
622        let source_fragment_id = relation.source_fragment_id as FragmentId;
623        let target_fragment_id = relation.target_fragment_id as FragmentId;
624        downstream_fragments
625            .entry(source_fragment_id)
626            .or_insert(HashMap::new())
627            .insert(target_fragment_id, relation.dispatcher_type);
628        downstream_relations.insert((source_fragment_id, target_fragment_id), relation);
629    }
630
631    Ok(RescheduleContext {
632        loaded,
633        job_extra_info,
634        upstream_fragments,
635        downstream_fragments,
636        downstream_relations,
637    })
638}
639
640/// Build a `Reschedule` by diffing the previously materialized fragment state against
641/// the newly rendered actor layout.
642///
643/// This function assumes a full rebuild (no kept actors) and produces:
644/// - actor additions/removals and vnode bitmap updates
645/// - dispatcher updates for upstream/downstream fragments
646/// - updated split assignments for source actors
647///
648/// `upstream_fragments`/`downstream_fragments` describe neighbor fragments and dispatcher types,
649/// while `all_actor_dispatchers` contains the new dispatcher list for each actor. `job_extra_info`
650/// supplies job-level context for building new actors.
651fn diff_fragment(
652    prev_fragment_info: &InflightFragmentInfo,
653    curr_actors: &HashMap<ActorId, InflightActorInfo>,
654    upstream_fragments: HashMap<FragmentId, DispatcherType>,
655    downstream_fragments: HashMap<FragmentId, DispatcherType>,
656    all_actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
657    job_extra_info: Option<&StreamingJobExtraInfo>,
658) -> MetaResult<Reschedule> {
659    let prev_ids: HashSet<_> = prev_fragment_info.actors.keys().cloned().collect();
660    let curr_ids: HashSet<_> = curr_actors.keys().cloned().collect();
661
662    let removed_actors: HashSet<_> = &prev_ids - &curr_ids;
663    let added_actor_ids: HashSet<_> = &curr_ids - &prev_ids;
664    let kept_ids: HashSet<_> = prev_ids.intersection(&curr_ids).cloned().collect();
665    debug_assert!(
666        kept_ids.is_empty(),
667        "kept actors found in scale; expected full rebuild, prev={prev_ids:?}, curr={curr_ids:?}, kept={kept_ids:?}"
668    );
669
670    let mut added_actors = HashMap::new();
671    for &actor_id in &added_actor_ids {
672        let InflightActorInfo { worker_id, .. } = curr_actors
673            .get(&actor_id)
674            .ok_or_else(|| anyhow!("BUG: Worker not found for new actor {}", actor_id))?;
675
676        added_actors
677            .entry(*worker_id)
678            .or_insert_with(Vec::new)
679            .push(actor_id);
680    }
681
682    let mut vnode_bitmap_updates = HashMap::new();
683    for actor_id in kept_ids {
684        let prev_actor = &prev_fragment_info.actors[&actor_id];
685        let curr_actor = &curr_actors[&actor_id];
686
687        // Check if the vnode distribution has changed.
688        if prev_actor.vnode_bitmap != curr_actor.vnode_bitmap
689            && let Some(bitmap) = curr_actor.vnode_bitmap.clone()
690        {
691            vnode_bitmap_updates.insert(actor_id, bitmap);
692        }
693    }
694
695    let upstream_dispatcher_mapping =
696        if let DistributionType::Hash = prev_fragment_info.distribution_type {
697            let actor_mapping = curr_actors
698                .iter()
699                .map(
700                    |(
701                        actor_id,
702                        InflightActorInfo {
703                            worker_id: _,
704                            vnode_bitmap,
705                            ..
706                        },
707                    )| { (*actor_id, vnode_bitmap.clone().unwrap()) },
708                )
709                .collect();
710            Some(ActorMapping::from_bitmaps(&actor_mapping))
711        } else {
712            None
713        };
714
715    let upstream_fragment_dispatcher_ids = upstream_fragments
716        .iter()
717        .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
718        .map(|(upstream_fragment, _)| (*upstream_fragment, prev_fragment_info.fragment_id))
719        .collect();
720
721    let downstream_fragment_ids = downstream_fragments
722        .iter()
723        .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
724        .map(|(fragment_id, _)| *fragment_id)
725        .collect();
726
727    let extra_info = job_extra_info.cloned().unwrap_or_default();
728    let expr_context = extra_info.stream_context().to_expr_context();
729    let job_definition = extra_info.job_definition;
730    let config_override = extra_info.config_override;
731
732    let newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)> =
733        added_actor_ids
734            .iter()
735            .map(|actor_id| {
736                let actor = StreamActor {
737                    actor_id: *actor_id,
738                    fragment_id: prev_fragment_info.fragment_id,
739                    vnode_bitmap: curr_actors[actor_id].vnode_bitmap.clone(),
740                    mview_definition: job_definition.clone(),
741                    expr_context: Some(expr_context.clone()),
742                    config_override: config_override.clone(),
743                };
744                (
745                    *actor_id,
746                    (
747                        (
748                            actor,
749                            all_actor_dispatchers
750                                .get(actor_id)
751                                .cloned()
752                                .unwrap_or_default(),
753                        ),
754                        curr_actors[actor_id].worker_id,
755                    ),
756                )
757            })
758            .collect();
759
760    let actor_splits = curr_actors
761        .iter()
762        .map(|(&actor_id, info)| (actor_id, info.splits.clone()))
763        .collect();
764
765    let reschedule = Reschedule {
766        added_actors,
767        removed_actors,
768        vnode_bitmap_updates,
769        upstream_fragment_dispatcher_ids,
770        upstream_dispatcher_mapping,
771        downstream_fragment_ids,
772        actor_splits,
773        newly_created_actors,
774    };
775
776    Ok(reschedule)
777}
778
779/// Build executable reschedule plans from the rendered fragment layout.
780///
781/// Callers are expected to run the no-op layout check before invoking this
782/// function so command construction stays focused on plan materialization.
783pub(crate) fn build_reschedule_commands(
784    render_result: FragmentRenderMap,
785    context: RescheduleContext,
786    all_prev_fragments: HashMap<FragmentId, &InflightFragmentInfo>,
787) -> MetaResult<HashMap<DatabaseId, ReschedulePlan>> {
788    if render_result.is_empty() {
789        return Ok(HashMap::new());
790    }
791
792    let RescheduleContext {
793        job_extra_info,
794        upstream_fragments: mut all_upstream_fragments,
795        downstream_fragments: mut all_downstream_fragments,
796        mut downstream_relations,
797        ..
798    } = context;
799
800    let fragment_ids = render_result
801        .values()
802        .flat_map(|jobs| jobs.values())
803        .flatten()
804        .map(|(fragment_id, _)| *fragment_id)
805        .collect_vec();
806
807    let all_related_fragment_ids: HashSet<_> = fragment_ids
808        .iter()
809        .copied()
810        .chain(all_upstream_fragments.values().flatten().map(|(id, _)| *id))
811        .chain(
812            all_downstream_fragments
813                .values()
814                .flatten()
815                .map(|(id, _)| *id),
816        )
817        .collect();
818
819    for fragment_id in all_related_fragment_ids {
820        if !all_prev_fragments.contains_key(&fragment_id) {
821            return Err(MetaError::from(anyhow!(
822                "previous fragment info for {fragment_id} not found"
823            )));
824        }
825    }
826
827    let all_rendered_fragments: HashMap<_, _> = render_result
828        .values()
829        .flat_map(|jobs| jobs.values())
830        .flatten()
831        .map(|(fragment_id, info)| (*fragment_id, info))
832        .collect();
833
834    let mut commands = HashMap::new();
835
836    for (database_id, jobs) in &render_result {
837        let mut all_fragment_actors = HashMap::new();
838        let mut reschedules = HashMap::new();
839
840        for (job_id, fragment_id, fragment_info) in jobs.iter().flat_map(|(job_id, fragments)| {
841            fragments
842                .iter()
843                .map(move |(fragment_id, info)| (job_id, fragment_id, info))
844        }) {
845            let InflightFragmentInfo {
846                distribution_type,
847                actors,
848                ..
849            } = fragment_info;
850
851            let upstream_fragments = all_upstream_fragments
852                .remove(&(*fragment_id as FragmentId))
853                .unwrap_or_default();
854            let downstream_fragments = all_downstream_fragments
855                .remove(&(*fragment_id as FragmentId))
856                .unwrap_or_default();
857
858            let fragment_actors: HashMap<_, _> = upstream_fragments
859                .keys()
860                .copied()
861                .chain(downstream_fragments.keys().copied())
862                .map(|fragment_id| {
863                    all_prev_fragments
864                        .get(&fragment_id)
865                        .map(|fragment| {
866                            (
867                                fragment_id,
868                                fragment.actors.keys().copied().collect::<HashSet<_>>(),
869                            )
870                        })
871                        .ok_or_else(|| {
872                            MetaError::from(anyhow!(
873                                "fragment {} not found in previous state",
874                                fragment_id
875                            ))
876                        })
877                })
878                .collect::<MetaResult<_>>()?;
879
880            all_fragment_actors.extend(fragment_actors);
881
882            let source_fragment_actors = actors
883                .iter()
884                .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
885                .collect();
886
887            let mut all_actor_dispatchers: HashMap<_, Vec<_>> = HashMap::new();
888
889            for downstream_fragment_id in downstream_fragments.keys() {
890                let target_fragment_actors =
891                    match all_rendered_fragments.get(downstream_fragment_id) {
892                        None => {
893                            let external_fragment = all_prev_fragments
894                                .get(downstream_fragment_id)
895                                .ok_or_else(|| {
896                                    MetaError::from(anyhow!(
897                                        "fragment {} not found in previous state",
898                                        downstream_fragment_id
899                                    ))
900                                })?;
901
902                            external_fragment
903                                .actors
904                                .iter()
905                                .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
906                                .collect()
907                        }
908                        Some(downstream_rendered) => downstream_rendered
909                            .actors
910                            .iter()
911                            .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
912                            .collect(),
913                    };
914
915                let target_fragment_distribution = *distribution_type;
916
917                let fragment_relation::Model {
918                    source_fragment_id: _,
919                    target_fragment_id: _,
920                    dispatcher_type,
921                    dist_key_indices,
922                    output_indices,
923                    output_type_mapping,
924                } = downstream_relations
925                    .remove(&(
926                        *fragment_id as FragmentId,
927                        *downstream_fragment_id as FragmentId,
928                    ))
929                    .ok_or_else(|| {
930                        MetaError::from(anyhow!(
931                            "downstream relation missing for {} -> {}",
932                            fragment_id,
933                            downstream_fragment_id
934                        ))
935                    })?;
936
937                let pb_mapping = PbDispatchOutputMapping {
938                    indices: output_indices.into_u32_array(),
939                    types: output_type_mapping.unwrap_or_default().to_protobuf(),
940                };
941
942                let (dispatchers, _) = compose_dispatchers(
943                    *distribution_type,
944                    &source_fragment_actors,
945                    *downstream_fragment_id,
946                    target_fragment_distribution,
947                    &target_fragment_actors,
948                    dispatcher_type,
949                    dist_key_indices.into_u32_array(),
950                    pb_mapping,
951                );
952
953                for (actor_id, dispatcher) in dispatchers {
954                    all_actor_dispatchers
955                        .entry(actor_id)
956                        .or_default()
957                        .push(dispatcher);
958                }
959            }
960
961            let prev_fragment = all_prev_fragments.get(&{ *fragment_id }).ok_or_else(|| {
962                MetaError::from(anyhow!(
963                    "fragment {} not found in previous state",
964                    fragment_id
965                ))
966            })?;
967
968            let reschedule = diff_fragment(
969                prev_fragment,
970                actors,
971                upstream_fragments,
972                downstream_fragments,
973                all_actor_dispatchers,
974                job_extra_info.get(job_id),
975            )?;
976
977            reschedules.insert(*fragment_id as FragmentId, reschedule);
978        }
979
980        let command = ReschedulePlan {
981            reschedules,
982            fragment_actors: all_fragment_actors,
983        };
984
985        debug_assert!(
986            command
987                .reschedules
988                .values()
989                .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
990            "reschedule plan carries vnode_bitmap_updates, expected full rebuild"
991        );
992
993        commands.insert(*database_id, command);
994    }
995
996    Ok(commands)
997}
998
999#[cfg(test)]
1000mod tests {
1001    use std::collections::{HashMap, HashSet};
1002
1003    use risingwave_common::bitmap::Bitmap;
1004    use risingwave_common::catalog::FragmentTypeMask;
1005    use risingwave_connector::source::SplitImpl;
1006    use risingwave_connector::source::test_source::TestSourceSplit;
1007
1008    use super::*;
1009
1010    fn actor(worker_id: impl Into<WorkerId>, vnode_bitmap: Option<Bitmap>) -> InflightActorInfo {
1011        InflightActorInfo {
1012            worker_id: worker_id.into(),
1013            vnode_bitmap,
1014            splits: vec![],
1015        }
1016    }
1017
1018    fn actor_with_splits(
1019        worker_id: impl Into<WorkerId>,
1020        vnode_bitmap: Option<Bitmap>,
1021        splits: Vec<SplitImpl>,
1022    ) -> InflightActorInfo {
1023        InflightActorInfo {
1024            worker_id: worker_id.into(),
1025            vnode_bitmap,
1026            splits,
1027        }
1028    }
1029
1030    fn test_split(id: &str, offset: &str) -> SplitImpl {
1031        SplitImpl::Test(TestSourceSplit {
1032            id: id.into(),
1033            properties: HashMap::new(),
1034            offset: offset.to_owned(),
1035        })
1036    }
1037
1038    fn fragment(
1039        fragment_id: FragmentId,
1040        actors: impl IntoIterator<Item = (ActorId, InflightActorInfo)>,
1041    ) -> InflightFragmentInfo {
1042        InflightFragmentInfo {
1043            fragment_id,
1044            distribution_type: DistributionType::Hash,
1045            fragment_type_mask: FragmentTypeMask::empty(),
1046            vnode_count: 8,
1047            nodes: Default::default(),
1048            actors: actors.into_iter().collect(),
1049            state_table_ids: HashSet::new(),
1050        }
1051    }
1052
1053    fn render_result(fragments: Vec<(FragmentId, InflightFragmentInfo)>) -> FragmentRenderMap {
1054        HashMap::from([(
1055            DatabaseId::new(1),
1056            HashMap::from([(
1057                JobId::new(1),
1058                fragments.into_iter().collect::<HashMap<_, _>>(),
1059            )]),
1060        )])
1061    }
1062
1063    #[test]
1064    fn rendered_layout_matches_current_ignores_actor_ids_across_fragments() {
1065        let current_source = fragment(
1066            FragmentId::new(1),
1067            [
1068                (
1069                    ActorId::new(101),
1070                    actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1071                ),
1072                (
1073                    ActorId::new(102),
1074                    actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1075                ),
1076            ],
1077        );
1078        let current_target = fragment(
1079            FragmentId::new(2),
1080            [
1081                (
1082                    ActorId::new(201),
1083                    actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1084                ),
1085                (
1086                    ActorId::new(202),
1087                    actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1088                ),
1089            ],
1090        );
1091
1092        let rendered = render_result(vec![
1093            (
1094                FragmentId::new(1),
1095                fragment(
1096                    FragmentId::new(1),
1097                    [
1098                        (
1099                            ActorId::new(1001),
1100                            actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1101                        ),
1102                        (
1103                            ActorId::new(1002),
1104                            actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1105                        ),
1106                    ],
1107                ),
1108            ),
1109            (
1110                FragmentId::new(2),
1111                fragment(
1112                    FragmentId::new(2),
1113                    [
1114                        (
1115                            ActorId::new(2001),
1116                            actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1117                        ),
1118                        (
1119                            ActorId::new(2002),
1120                            actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1121                        ),
1122                    ],
1123                ),
1124            ),
1125        ]);
1126
1127        let prev = HashMap::from([
1128            (FragmentId::new(1), &current_source),
1129            (FragmentId::new(2), &current_target),
1130        ]);
1131
1132        assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1133    }
1134
1135    #[test]
1136    fn rendered_layout_matches_current_detects_target_fragment_layout_changes() {
1137        let current_source = fragment(
1138            FragmentId::new(1),
1139            [
1140                (
1141                    ActorId::new(101),
1142                    actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1143                ),
1144                (
1145                    ActorId::new(102),
1146                    actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1147                ),
1148            ],
1149        );
1150        let current_target = fragment(
1151            FragmentId::new(2),
1152            [
1153                (
1154                    ActorId::new(201),
1155                    actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1156                ),
1157                (
1158                    ActorId::new(202),
1159                    actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1160                ),
1161            ],
1162        );
1163
1164        let rendered = render_result(vec![
1165            (
1166                FragmentId::new(1),
1167                fragment(
1168                    FragmentId::new(1),
1169                    [
1170                        (
1171                            ActorId::new(1001),
1172                            actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1173                        ),
1174                        (
1175                            ActorId::new(1002),
1176                            actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1177                        ),
1178                    ],
1179                ),
1180            ),
1181            (
1182                FragmentId::new(2),
1183                fragment(
1184                    FragmentId::new(2),
1185                    [
1186                        (
1187                            ActorId::new(2001),
1188                            actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3, 4, 5]))),
1189                        ),
1190                        (
1191                            ActorId::new(2002),
1192                            actor(4, Some(Bitmap::from_indices(8, &[6, 7]))),
1193                        ),
1194                    ],
1195                ),
1196            ),
1197        ]);
1198
1199        let prev = HashMap::from([
1200            (FragmentId::new(1), &current_source),
1201            (FragmentId::new(2), &current_target),
1202        ]);
1203
1204        assert!(!rendered_layout_matches_current(&rendered, &prev,).unwrap());
1205    }
1206
1207    #[test]
1208    fn rendered_layout_matches_current_ignores_split_offsets() {
1209        let fragment_id = FragmentId::new(1);
1210        let prev_fragment = fragment(
1211            fragment_id,
1212            [(
1213                ActorId::new(101),
1214                actor_with_splits(
1215                    1,
1216                    Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1217                    vec![test_split("split-0", "100")],
1218                ),
1219            )],
1220        );
1221        let rendered = render_result(vec![(
1222            fragment_id,
1223            fragment(
1224                fragment_id,
1225                [(
1226                    ActorId::new(1001),
1227                    actor_with_splits(
1228                        1,
1229                        Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1230                        vec![test_split("split-0", "200")],
1231                    ),
1232                )],
1233            ),
1234        )]);
1235        let prev = HashMap::from([(fragment_id, &prev_fragment)]);
1236
1237        assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1238    }
1239}
1240
1241#[derive(Clone, Debug, Eq, PartialEq)]
1242pub struct ParallelismPolicy {
1243    pub parallelism: StreamingParallelism,
1244    pub adaptive_parallelism_strategy: Option<String>,
1245}
1246
1247#[derive(Clone, Debug)]
1248pub struct ResourceGroupPolicy {
1249    pub resource_group: Option<String>,
1250}
1251
1252#[derive(Clone, Debug)]
1253pub enum ReschedulePolicy {
1254    Parallelism(ParallelismPolicy),
1255    ResourceGroup(ResourceGroupPolicy),
1256    Both(ParallelismPolicy, ResourceGroupPolicy),
1257}
1258
1259impl GlobalStreamManager {
1260    #[await_tree::instrument("acquire_reschedule_read_guard")]
1261    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
1262        self.scale_controller.reschedule_lock.read().await
1263    }
1264
1265    #[await_tree::instrument("acquire_reschedule_write_guard")]
1266    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
1267        self.scale_controller.reschedule_lock.write().await
1268    }
1269
1270    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
1271    /// examines if there are any jobs can be scaled, and scales them if found.
1272    ///
1273    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
1274    ///
1275    /// Returns
1276    /// - `Ok(false)` if no jobs can be scaled;
1277    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
1278    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
1279        tracing::info!("trigger parallelism control");
1280
1281        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
1282
1283        let background_streaming_jobs = self
1284            .metadata_manager
1285            .list_background_creating_jobs()
1286            .await?;
1287
1288        let blocked_jobs = self
1289            .metadata_manager
1290            .collect_reschedule_blocked_jobs_for_creating_jobs(&background_streaming_jobs, true)
1291            .await?;
1292        let has_blocked_jobs = !blocked_jobs.is_empty();
1293
1294        let database_objects: HashMap<risingwave_meta_model::DatabaseId, Vec<JobId>> = self
1295            .metadata_manager
1296            .catalog_controller
1297            .list_streaming_job_with_database()
1298            .await?;
1299
1300        let job_ids = database_objects
1301            .iter()
1302            .flat_map(|(database_id, job_ids)| {
1303                job_ids
1304                    .iter()
1305                    .enumerate()
1306                    .map(move |(idx, job_id)| (idx, database_id, job_id))
1307            })
1308            .sorted_by(|(idx_a, database_a, _), (idx_b, database_b, _)| {
1309                idx_a.cmp(idx_b).then(database_a.cmp(database_b))
1310            })
1311            .map(|(_, database_id, job_id)| (*database_id, *job_id))
1312            .filter(|(_, job_id)| !blocked_jobs.contains(job_id))
1313            .collect_vec();
1314
1315        if job_ids.is_empty() {
1316            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
1317            // Retry periodically while some jobs are temporarily blocked by creating
1318            // unreschedulable backfill jobs. This allows us to scale them
1319            // automatically once the creating jobs finish.
1320            return Ok(has_blocked_jobs);
1321        }
1322
1323        let active_workers =
1324            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
1325
1326        tracing::info!(
1327            "trigger parallelism control for jobs: {:#?}, workers {:#?}",
1328            job_ids,
1329            active_workers.current()
1330        );
1331
1332        let batch_size = match self.env.opts.parallelism_control_batch_size {
1333            0 => job_ids.len(),
1334            n => n,
1335        };
1336
1337        tracing::info!(
1338            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
1339            job_ids.len(),
1340            batch_size,
1341            active_workers.current()
1342        );
1343
1344        let batches: Vec<_> = job_ids
1345            .into_iter()
1346            .chunks(batch_size)
1347            .into_iter()
1348            .map(|chunk| chunk.collect_vec())
1349            .collect();
1350
1351        for batch in batches {
1352            let jobs = batch.iter().map(|(_, job_id)| *job_id).collect();
1353
1354            let commands = self.scale_controller.rerender(jobs).await?;
1355
1356            let futures = commands.into_iter().map(|(database_id, command)| {
1357                let barrier_scheduler = self.barrier_scheduler.clone();
1358                async move { barrier_scheduler.run_command(database_id, command).await }
1359            });
1360
1361            let _results = future::try_join_all(futures).await?;
1362        }
1363
1364        Ok(has_blocked_jobs)
1365    }
1366
1367    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
1368    async fn run(&self, mut shutdown_rx: Receiver<()>) {
1369        tracing::info!("starting automatic parallelism control monitor");
1370
1371        let check_period =
1372            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
1373
1374        let mut ticker = tokio::time::interval_at(
1375            Instant::now()
1376                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
1377            check_period,
1378        );
1379        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1380
1381        let (local_notification_tx, mut local_notification_rx) =
1382            tokio::sync::mpsc::unbounded_channel();
1383
1384        self.env
1385            .notification_manager()
1386            .insert_local_sender(local_notification_tx);
1387
1388        // waiting for the first tick
1389        ticker.tick().await;
1390
1391        let worker_nodes = self
1392            .metadata_manager
1393            .list_active_streaming_compute_nodes()
1394            .await
1395            .expect("list active streaming compute nodes");
1396
1397        let mut worker_cache: BTreeMap<_, _> = worker_nodes
1398            .into_iter()
1399            .map(|worker| (worker.id, worker))
1400            .collect();
1401
1402        let mut should_trigger = false;
1403
1404        loop {
1405            tokio::select! {
1406                biased;
1407
1408                _ = &mut shutdown_rx => {
1409                    tracing::info!("Stream manager is stopped");
1410                    break;
1411                }
1412
1413                _ = ticker.tick(), if should_trigger => {
1414                    let include_workers = worker_cache.keys().copied().collect_vec();
1415
1416                    if include_workers.is_empty() {
1417                        tracing::debug!("no available worker nodes");
1418                        should_trigger = false;
1419                        continue;
1420                    }
1421
1422                    match self.trigger_parallelism_control().await {
1423                        Ok(cont) => {
1424                            should_trigger = cont;
1425                        }
1426                        Err(e) => {
1427                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
1428                            ticker.reset();
1429                        }
1430                    }
1431                }
1432
1433                notification = local_notification_rx.recv() => {
1434                    let notification = notification.expect("local notification channel closed in loop of stream manager");
1435
1436                    // Only maintain the cache for streaming compute nodes.
1437                    let worker_is_streaming_compute = |worker: &WorkerNode| {
1438                        worker.get_type() == Ok(WorkerType::ComputeNode)
1439                            && worker.property.as_ref().unwrap().is_streaming
1440                    };
1441
1442                    match notification {
1443                        // TODO(#25478): add a local notification path for relevant session
1444                        // parameter updates and trigger a control round here when the cluster
1445                        // defaults for stream/backfill parallelism change.
1446                        LocalNotification::SystemParamsChange(_) => {}
1447                        LocalNotification::WorkerNodeActivated(worker) => {
1448                            if !worker_is_streaming_compute(&worker) {
1449                                continue;
1450                            }
1451
1452                            tracing::info!(worker = %worker.id, "worker activated notification received");
1453
1454                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
1455
1456                            match prev_worker {
1457                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
1458                                    tracing::info!(worker = %worker.id, "worker parallelism changed");
1459                                    should_trigger = true;
1460                                }
1461                                Some(prev_worker) if prev_worker.resource_group() != worker.resource_group()  => {
1462                                    tracing::info!(worker = %worker.id, "worker label changed");
1463                                    should_trigger = true;
1464                                }
1465                                None => {
1466                                    tracing::info!(worker = %worker.id, "new worker joined");
1467                                    should_trigger = true;
1468                                }
1469                                _ => {}
1470                            }
1471                        }
1472
1473                        // Since our logic for handling passive scale-in is within the barrier manager,
1474                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
1475                        LocalNotification::WorkerNodeDeleted(worker) => {
1476                            if !worker_is_streaming_compute(&worker) {
1477                                continue;
1478                            }
1479
1480                            match worker_cache.remove(&worker.id) {
1481                                Some(prev_worker) => {
1482                                    tracing::info!(worker = %prev_worker.id, "worker removed from stream manager cache");
1483                                }
1484                                None => {
1485                                    tracing::warn!(worker = %worker.id, "worker not found in stream manager cache, but it was removed");
1486                                }
1487                            }
1488                        }
1489
1490                        LocalNotification::StreamingJobBackfillFinished(job_id) => {
1491                            tracing::debug!(job_id = %job_id, "received backfill finished notification");
1492                            if let Err(e) = self.apply_post_backfill_parallelism(job_id).await {
1493                                tracing::warn!(job_id = %job_id, error = %e.as_report(), "failed to restore parallelism after backfill");
1494                                // Retry in the next periodic tick. This avoids triggering
1495                                // unnecessary full control passes when restore succeeds.
1496                                should_trigger = true;
1497                            }
1498                        }
1499
1500                        _ => {}
1501                    }
1502                }
1503            }
1504        }
1505    }
1506
1507    /// Restores a streaming job's parallelism to its target value after backfill completes.
1508    async fn apply_post_backfill_parallelism(&self, job_id: JobId) -> MetaResult<()> {
1509        // Fetch both the target parallelism (final desired state) and the backfill parallelism
1510        // (temporary parallelism used during backfill phase) from the catalog.
1511        let Some((
1512            target,
1513            target_adaptive_parallelism_strategy,
1514            backfill_parallelism,
1515            backfill_adaptive_parallelism_strategy,
1516        )) = self
1517            .metadata_manager
1518            .catalog_controller
1519            .get_job_parallelisms(job_id)
1520            .await?
1521        else {
1522            // The job may have been dropped before this notification is processed.
1523            // Treat it as a benign no-op so we don't trigger unnecessary retries.
1524            tracing::debug!(
1525                job_id = %job_id,
1526                "streaming job not found when applying post-backfill parallelism, skip"
1527            );
1528            return Ok(());
1529        };
1530
1531        // Determine if we need to reschedule based on the backfill configuration.
1532        match backfill_parallelism {
1533            Some(backfill_parallelism)
1534                if backfill_parallelism == target
1535                    && backfill_adaptive_parallelism_strategy
1536                        == target_adaptive_parallelism_strategy =>
1537            {
1538                // Backfill parallelism matches target - no reschedule needed since the job
1539                // is already running at the desired parallelism.
1540                tracing::debug!(
1541                    job_id = %job_id,
1542                    ?backfill_parallelism,
1543                    ?target,
1544                    "backfill parallelism equals job parallelism, skip reschedule"
1545                );
1546                return Ok(());
1547            }
1548            Some(_) => {
1549                // Backfill parallelism differs from target - proceed to restore target parallelism.
1550            }
1551            None => {
1552                // No backfill parallelism was configured, meaning the job was created without
1553                // a special backfill override. No reschedule is necessary.
1554                tracing::debug!(
1555                    job_id = %job_id,
1556                    ?target,
1557                    "no backfill parallelism configured, skip post-backfill reschedule"
1558                );
1559                return Ok(());
1560            }
1561        }
1562
1563        // Reschedule the job to restore its target parallelism.
1564        tracing::info!(
1565            job_id = %job_id,
1566            ?target,
1567            ?backfill_parallelism,
1568            "restoring parallelism after backfill via reschedule"
1569        );
1570        let policy = ReschedulePolicy::Parallelism(ParallelismPolicy {
1571            parallelism: target,
1572            adaptive_parallelism_strategy: target_adaptive_parallelism_strategy,
1573        });
1574        if let Err(e) = self.reschedule_streaming_job(job_id, policy, false).await {
1575            tracing::warn!(job_id = %job_id, error = %e.as_report(), "reschedule after backfill failed");
1576            return Err(e);
1577        }
1578
1579        tracing::info!(job_id = %job_id, "parallelism reschedule after backfill submitted");
1580        Ok(())
1581    }
1582
1583    pub fn start_auto_parallelism_monitor(
1584        self: Arc<Self>,
1585    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
1586        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
1587        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1588        let join_handle = tokio::spawn(async move {
1589            self.run(shutdown_rx).await;
1590        });
1591
1592        (join_handle, shutdown_tx)
1593    }
1594}