risingwave_meta/stream/
scale.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::future;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt};
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_connector::source::{SplitId, SplitMetaData};
27use risingwave_meta_model::{
28    StreamingParallelism, WorkerId, fragment, fragment_relation, object, streaming_job,
29};
30use risingwave_pb::common::{WorkerNode, WorkerType};
31use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher};
32use sea_orm::{ConnectionTrait, JoinType, QuerySelect, RelationTrait};
33use thiserror_ext::AsReport;
34use tokio::sync::oneshot::Receiver;
35use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
36use tokio::task::JoinHandle;
37use tokio::time::{Instant, MissedTickBehavior};
38
39use crate::barrier::{Command, Reschedule, RescheduleContext, ReschedulePlan};
40use crate::controller::scale::{
41    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble,
42    find_fragment_no_shuffle_dags_detailed, load_fragment_context, load_fragment_context_for_jobs,
43};
44use crate::error::bail_invalid_parameter;
45use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager};
46use crate::model::{ActorId, FragmentId, StreamActor, StreamActorWithDispatchers};
47use crate::stream::{GlobalStreamManager, SourceManagerRef};
48use crate::{MetaError, MetaResult};
49
50#[derive(Debug, Clone, Eq, PartialEq)]
51pub struct WorkerReschedule {
52    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
53}
54
55use risingwave_common::id::JobId;
56use risingwave_common::system_param::AdaptiveParallelismStrategy;
57use risingwave_meta_model::DispatcherType;
58use risingwave_meta_model::fragment::DistributionType;
59use risingwave_meta_model::prelude::{Fragment, FragmentRelation, StreamingJob};
60use sea_orm::ActiveValue::Set;
61use sea_orm::{
62    ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
63};
64
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::controller::utils::{
67    StreamingJobExtraInfo, compose_dispatchers, get_streaming_job_extra_info,
68};
69
70pub type ScaleControllerRef = Arc<ScaleController>;
71
72// Field order is intentional: derived `Ord` is used to canonicalize actor
73// layout for no-op detection, so new fields must preserve meaningful
74// comparison order.
75#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
76struct NormalizedActor {
77    worker_id: WorkerId,
78    vnode_bitmap: Option<Vec<usize>>,
79    splits: Vec<SplitId>,
80}
81
82#[derive(Clone, Debug, PartialEq, Eq)]
83struct NormalizedFragmentLayout {
84    distribution_type: DistributionType,
85    actors: Vec<NormalizedActor>,
86}
87
88fn normalize_actor_info(info: &InflightActorInfo) -> NormalizedActor {
89    let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
90        bitmap
91            .iter_vnodes()
92            .map(|vnode| vnode.to_index())
93            .collect_vec()
94    });
95    let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
96    splits.sort_unstable();
97    NormalizedActor {
98        worker_id: info.worker_id,
99        vnode_bitmap,
100        splits,
101    }
102}
103
104fn build_normalized_fragment_layout(
105    fragment_info: &InflightFragmentInfo,
106) -> NormalizedFragmentLayout {
107    // Actor ids are intentionally erased here. For no-op detection we only care
108    // about the semantic layout carried by worker placement, vnode ownership,
109    // and split assignment.
110    let mut actors = fragment_info
111        .actors
112        .values()
113        .map(normalize_actor_info)
114        .collect_vec();
115    actors.sort_unstable();
116
117    NormalizedFragmentLayout {
118        distribution_type: fragment_info.distribution_type,
119        actors,
120    }
121}
122
123/// Compare the rendered (preview) layout against the current in-flight layout
124/// to decide whether a reschedule is a no-op.
125///
126/// Dispatcher layout is intentionally not compared here. On the reschedule path
127/// it is a deterministic function of the fragment actor layout together with the
128/// stable fragment-relation metadata held in `RescheduleContext`, so fragment-level
129/// equality already implies dispatcher equality.
130///
131/// Only fragments present in `render_result` are compared. This is intentional:
132/// this function sits on the reschedule path, which only re-renders fragments
133/// that were loaded into the `RescheduleContext`. Fragment creation or deletion
134/// is handled by separate DDL / recovery paths, not by reschedule, so fragments
135/// outside the render set are irrelevant here. In other words, this is a subset
136/// check over the rendered fragments, not a full bidirectional equality check.
137pub(crate) fn rendered_layout_matches_current(
138    render_result: &FragmentRenderMap,
139    all_prev_fragments: &HashMap<FragmentId, &InflightFragmentInfo>,
140) -> MetaResult<bool> {
141    let all_rendered_fragments: HashMap<_, _> = render_result
142        .values()
143        .flat_map(|jobs| jobs.values())
144        .flatten()
145        .map(|(fragment_id, info)| (*fragment_id, info))
146        .collect();
147
148    for (fragment_id, rendered_fragment) in &all_rendered_fragments {
149        let Some(prev_fragment) = all_prev_fragments.get(fragment_id).copied() else {
150            return Err(MetaError::from(anyhow!(
151                "previous fragment info for {} not found",
152                fragment_id
153            )));
154        };
155
156        let rendered_layout = build_normalized_fragment_layout(rendered_fragment);
157        let current_layout = build_normalized_fragment_layout(prev_fragment);
158
159        if rendered_layout != current_layout {
160            return Ok(false);
161        }
162    }
163
164    Ok(true)
165}
166
167pub struct ScaleController {
168    pub metadata_manager: MetadataManager,
169
170    pub source_manager: SourceManagerRef,
171
172    pub env: MetaSrvEnv,
173
174    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
175    /// e.g., a MV cannot be rescheduled during foreground backfill.
176    pub reschedule_lock: RwLock<()>,
177}
178
179impl ScaleController {
180    pub fn new(
181        metadata_manager: &MetadataManager,
182        source_manager: SourceManagerRef,
183        env: MetaSrvEnv,
184    ) -> Self {
185        Self {
186            metadata_manager: metadata_manager.clone(),
187            source_manager,
188            env,
189            reschedule_lock: RwLock::new(()),
190        }
191    }
192
193    pub async fn resolve_related_no_shuffle_jobs(
194        &self,
195        jobs: &[JobId],
196    ) -> MetaResult<HashSet<JobId>> {
197        let inner = self.metadata_manager.catalog_controller.inner.read().await;
198        let txn = inner.db.begin().await?;
199
200        let fragment_ids: Vec<_> = Fragment::find()
201            .select_only()
202            .column(fragment::Column::FragmentId)
203            .filter(fragment::Column::JobId.is_in(jobs.to_vec()))
204            .into_tuple()
205            .all(&txn)
206            .await?;
207        let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
208        let related_fragments = ensembles
209            .iter()
210            .flat_map(|ensemble| ensemble.fragments())
211            .collect_vec();
212
213        let job_ids: Vec<_> = Fragment::find()
214            .select_only()
215            .column(fragment::Column::JobId)
216            .filter(fragment::Column::FragmentId.is_in(related_fragments))
217            .into_tuple()
218            .all(&txn)
219            .await?;
220
221        let job_ids = job_ids.into_iter().collect();
222
223        Ok(job_ids)
224    }
225
226    pub async fn reschedule_inplace(
227        &self,
228        policy: HashMap<JobId, ReschedulePolicy>,
229    ) -> MetaResult<HashMap<DatabaseId, Command>> {
230        let inner = self.metadata_manager.catalog_controller.inner.write().await;
231        let txn = inner.db.begin().await?;
232
233        for (table_id, target) in &policy {
234            let streaming_job = StreamingJob::find_by_id(*table_id)
235                .one(&txn)
236                .await?
237                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
238
239            let max_parallelism = streaming_job.max_parallelism;
240
241            let mut streaming_job = streaming_job.into_active_model();
242
243            match &target {
244                ReschedulePolicy::Parallelism(p) | ReschedulePolicy::Both(p, _) => {
245                    if let StreamingParallelism::Fixed(n) = p.parallelism
246                        && n > max_parallelism as usize
247                    {
248                        bail!(format!(
249                            "specified parallelism {n} should not exceed max parallelism {max_parallelism}"
250                        ));
251                    }
252
253                    streaming_job.parallelism = Set(p.parallelism.clone());
254                }
255                _ => {}
256            }
257
258            match &target {
259                ReschedulePolicy::ResourceGroup(r) | ReschedulePolicy::Both(_, r) => {
260                    streaming_job.specific_resource_group = Set(r.resource_group.clone());
261                }
262                _ => {}
263            }
264
265            StreamingJob::update(streaming_job).exec(&txn).await?;
266        }
267
268        let job_ids: HashSet<JobId> = policy.keys().copied().collect();
269        let commands = build_reschedule_intent_for_jobs(&txn, job_ids).await?;
270
271        txn.commit().await?;
272
273        Ok(commands)
274    }
275
276    pub async fn reschedule_backfill_parallelism_inplace(
277        &self,
278        policy: HashMap<JobId, Option<StreamingParallelism>>,
279    ) -> MetaResult<HashMap<DatabaseId, Command>> {
280        if policy.is_empty() {
281            return Ok(HashMap::new());
282        }
283
284        let inner = self.metadata_manager.catalog_controller.inner.write().await;
285        let txn = inner.db.begin().await?;
286
287        for (table_id, parallelism) in &policy {
288            let streaming_job = StreamingJob::find_by_id(*table_id)
289                .one(&txn)
290                .await?
291                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
292
293            let max_parallelism = streaming_job.max_parallelism;
294
295            let mut streaming_job = streaming_job.into_active_model();
296
297            if let Some(StreamingParallelism::Fixed(n)) = parallelism
298                && *n > max_parallelism as usize
299            {
300                bail!(format!(
301                    "specified backfill parallelism {n} should not exceed max parallelism {max_parallelism}"
302                ));
303            }
304
305            streaming_job.backfill_parallelism = Set(parallelism.clone());
306            streaming_job.update(&txn).await?;
307        }
308
309        let jobs = policy.keys().copied().collect();
310
311        let command = build_reschedule_intent_for_jobs(&txn, jobs).await?;
312
313        txn.commit().await?;
314
315        Ok(command)
316    }
317
318    pub async fn reschedule_fragment_inplace(
319        &self,
320        policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
321    ) -> MetaResult<HashMap<DatabaseId, Command>> {
322        if policy.is_empty() {
323            return Ok(HashMap::new());
324        }
325
326        let inner = self.metadata_manager.catalog_controller.inner.write().await;
327        let txn = inner.db.begin().await?;
328
329        let fragment_id_list = policy.keys().copied().collect_vec();
330
331        let existing_fragment_ids: HashSet<_> = Fragment::find()
332            .select_only()
333            .column(fragment::Column::FragmentId)
334            .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
335            .into_tuple::<risingwave_meta_model::FragmentId>()
336            .all(&txn)
337            .await?
338            .into_iter()
339            .collect();
340
341        if let Some(missing_fragment_id) = fragment_id_list
342            .iter()
343            .find(|fragment_id| !existing_fragment_ids.contains(*fragment_id))
344        {
345            return Err(MetaError::catalog_id_not_found(
346                "fragment",
347                *missing_fragment_id,
348            ));
349        }
350
351        let mut target_ensembles = vec![];
352
353        for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
354            let entry_fragment_ids = ensemble.entry_fragments().collect_vec();
355
356            let desired_parallelism = match entry_fragment_ids
357                .iter()
358                .filter_map(|fragment_id| policy.get(fragment_id).cloned())
359                .dedup()
360                .collect_vec()
361                .as_slice()
362            {
363                [] => {
364                    bail_invalid_parameter!(
365                        "none of the entry fragments {:?} were included in the reschedule request; \
366                         provide at least one entry fragment id",
367                        entry_fragment_ids
368                    );
369                }
370                [parallelism] => parallelism.clone(),
371                parallelisms => {
372                    bail!(
373                        "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
374                        parallelisms
375                    );
376                }
377            };
378
379            let fragments = Fragment::find()
380                .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
381                .all(&txn)
382                .await?;
383
384            debug_assert!(
385                fragments
386                    .iter()
387                    .map(|fragment| fragment.parallelism.as_ref())
388                    .all_equal(),
389                "entry fragments in the same ensemble should share the same parallelism"
390            );
391
392            let current_parallelism = fragments
393                .first()
394                .and_then(|fragment| fragment.parallelism.clone());
395
396            if current_parallelism == desired_parallelism {
397                continue;
398            }
399
400            for fragment in fragments {
401                let mut fragment = fragment.into_active_model();
402                fragment.parallelism = Set(desired_parallelism.clone());
403                Fragment::update(fragment).exec(&txn).await?;
404            }
405
406            target_ensembles.push(ensemble);
407        }
408
409        if target_ensembles.is_empty() {
410            txn.commit().await?;
411            return Ok(HashMap::new());
412        }
413
414        let target_fragment_ids: HashSet<FragmentId> = target_ensembles
415            .iter()
416            .flat_map(|ensemble| ensemble.component_fragments())
417            .collect();
418        let commands = build_reschedule_intent_for_fragments(&txn, target_fragment_ids).await?;
419
420        txn.commit().await?;
421
422        Ok(commands)
423    }
424
425    async fn rerender(&self, jobs: HashSet<JobId>) -> MetaResult<HashMap<DatabaseId, Command>> {
426        if jobs.is_empty() {
427            return Ok(HashMap::new());
428        }
429
430        let inner = self.metadata_manager.catalog_controller.inner.read().await;
431        let txn = inner.db.begin().await?;
432        let commands = build_reschedule_intent_for_jobs(&txn, jobs).await?;
433        txn.commit().await?;
434        Ok(commands)
435    }
436}
437
438async fn build_reschedule_intent_for_jobs(
439    txn: &impl ConnectionTrait,
440    job_ids: HashSet<JobId>,
441) -> MetaResult<HashMap<DatabaseId, Command>> {
442    if job_ids.is_empty() {
443        return Ok(HashMap::new());
444    }
445
446    let job_id_list = job_ids.iter().copied().collect_vec();
447    let database_jobs: Vec<(DatabaseId, JobId)> = StreamingJob::find()
448        .select_only()
449        .column(object::Column::DatabaseId)
450        .column(streaming_job::Column::JobId)
451        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
452        .filter(streaming_job::Column::JobId.is_in(job_id_list.clone()))
453        .into_tuple()
454        .all(txn)
455        .await?;
456
457    if database_jobs.len() != job_ids.len() {
458        let returned_jobs: HashSet<JobId> =
459            database_jobs.iter().map(|(_, job_id)| *job_id).collect();
460        let missing = job_ids.difference(&returned_jobs).copied().collect_vec();
461        return Err(MetaError::catalog_id_not_found(
462            "streaming job",
463            format!("{missing:?}"),
464        ));
465    }
466
467    let reschedule_context = load_reschedule_context_for_jobs(txn, job_ids).await?;
468    if reschedule_context.is_empty() {
469        return Ok(HashMap::new());
470    }
471
472    let commands = reschedule_context
473        .into_database_contexts()
474        .into_iter()
475        .map(|(database_id, context)| {
476            (
477                database_id,
478                Command::RescheduleIntent {
479                    context,
480                    reschedule_plan: None,
481                },
482            )
483        })
484        .collect();
485
486    Ok(commands)
487}
488
489async fn build_reschedule_intent_for_fragments(
490    txn: &impl ConnectionTrait,
491    fragment_ids: HashSet<FragmentId>,
492) -> MetaResult<HashMap<DatabaseId, Command>> {
493    if fragment_ids.is_empty() {
494        return Ok(HashMap::new());
495    }
496
497    let fragment_id_list = fragment_ids.iter().copied().collect_vec();
498    let fragment_databases: Vec<(FragmentId, DatabaseId)> = Fragment::find()
499        .select_only()
500        .column(fragment::Column::FragmentId)
501        .column(object::Column::DatabaseId)
502        .join(JoinType::LeftJoin, fragment::Relation::Object.def())
503        .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
504        .into_tuple()
505        .all(txn)
506        .await?;
507
508    if fragment_databases.len() != fragment_ids.len() {
509        let returned: HashSet<FragmentId> = fragment_databases
510            .iter()
511            .map(|(fragment_id, _)| *fragment_id)
512            .collect();
513        let missing = fragment_ids.difference(&returned).copied().collect_vec();
514        return Err(MetaError::catalog_id_not_found(
515            "fragment",
516            format!("{missing:?}"),
517        ));
518    }
519
520    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_id_list).await?;
521    let reschedule_context = load_reschedule_context_for_ensembles(txn, ensembles).await?;
522    if reschedule_context.is_empty() {
523        return Ok(HashMap::new());
524    }
525
526    let commands = reschedule_context
527        .into_database_contexts()
528        .into_iter()
529        .map(|(database_id, context)| {
530            (
531                database_id,
532                Command::RescheduleIntent {
533                    context,
534                    reschedule_plan: None,
535                },
536            )
537        })
538        .collect();
539
540    Ok(commands)
541}
542
543async fn load_reschedule_context_for_jobs(
544    txn: &impl ConnectionTrait,
545    job_ids: HashSet<JobId>,
546) -> MetaResult<RescheduleContext> {
547    let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
548    build_reschedule_context_from_loaded(txn, loaded).await
549}
550
551async fn load_reschedule_context_for_ensembles(
552    txn: &impl ConnectionTrait,
553    ensembles: Vec<NoShuffleEnsemble>,
554) -> MetaResult<RescheduleContext> {
555    let loaded = load_fragment_context(txn, ensembles).await?;
556    build_reschedule_context_from_loaded(txn, loaded).await
557}
558
559async fn build_reschedule_context_from_loaded(
560    txn: &impl ConnectionTrait,
561    loaded: LoadedFragmentContext,
562) -> MetaResult<RescheduleContext> {
563    if loaded.is_empty() {
564        return Ok(RescheduleContext::empty());
565    }
566
567    let job_ids = loaded.job_map.keys().copied().collect_vec();
568    let job_extra_info = get_streaming_job_extra_info(txn, job_ids).await?;
569
570    let fragment_ids = loaded
571        .job_fragments
572        .values()
573        .flat_map(|fragments| fragments.keys().copied())
574        .collect_vec();
575
576    let upstreams: Vec<(FragmentId, FragmentId, DispatcherType)> = FragmentRelation::find()
577        .select_only()
578        .columns([
579            fragment_relation::Column::TargetFragmentId,
580            fragment_relation::Column::SourceFragmentId,
581            fragment_relation::Column::DispatcherType,
582        ])
583        .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
584        .into_tuple()
585        .all(txn)
586        .await?;
587
588    let mut upstream_fragments = HashMap::new();
589    for (fragment, upstream, dispatcher) in upstreams {
590        upstream_fragments
591            .entry(fragment as FragmentId)
592            .or_insert(HashMap::new())
593            .insert(upstream as FragmentId, dispatcher);
594    }
595
596    let downstreams = FragmentRelation::find()
597        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids.clone()))
598        .all(txn)
599        .await?;
600
601    let mut downstream_fragments = HashMap::new();
602    let mut downstream_relations = HashMap::new();
603    for relation in downstreams {
604        let source_fragment_id = relation.source_fragment_id as FragmentId;
605        let target_fragment_id = relation.target_fragment_id as FragmentId;
606        downstream_fragments
607            .entry(source_fragment_id)
608            .or_insert(HashMap::new())
609            .insert(target_fragment_id, relation.dispatcher_type);
610        downstream_relations.insert((source_fragment_id, target_fragment_id), relation);
611    }
612
613    Ok(RescheduleContext {
614        loaded,
615        job_extra_info,
616        upstream_fragments,
617        downstream_fragments,
618        downstream_relations,
619    })
620}
621
622/// Build a `Reschedule` by diffing the previously materialized fragment state against
623/// the newly rendered actor layout.
624///
625/// This function assumes a full rebuild (no kept actors) and produces:
626/// - actor additions/removals and vnode bitmap updates
627/// - dispatcher updates for upstream/downstream fragments
628/// - updated split assignments for source actors
629///
630/// `upstream_fragments`/`downstream_fragments` describe neighbor fragments and dispatcher types,
631/// while `all_actor_dispatchers` contains the new dispatcher list for each actor. `job_extra_info`
632/// supplies job-level context for building new actors.
633fn diff_fragment(
634    prev_fragment_info: &InflightFragmentInfo,
635    curr_actors: &HashMap<ActorId, InflightActorInfo>,
636    upstream_fragments: HashMap<FragmentId, DispatcherType>,
637    downstream_fragments: HashMap<FragmentId, DispatcherType>,
638    all_actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
639    job_extra_info: Option<&StreamingJobExtraInfo>,
640) -> MetaResult<Reschedule> {
641    let prev_ids: HashSet<_> = prev_fragment_info.actors.keys().cloned().collect();
642    let curr_ids: HashSet<_> = curr_actors.keys().cloned().collect();
643
644    let removed_actors: HashSet<_> = &prev_ids - &curr_ids;
645    let added_actor_ids: HashSet<_> = &curr_ids - &prev_ids;
646    let kept_ids: HashSet<_> = prev_ids.intersection(&curr_ids).cloned().collect();
647    debug_assert!(
648        kept_ids.is_empty(),
649        "kept actors found in scale; expected full rebuild, prev={prev_ids:?}, curr={curr_ids:?}, kept={kept_ids:?}"
650    );
651
652    let mut added_actors = HashMap::new();
653    for &actor_id in &added_actor_ids {
654        let InflightActorInfo { worker_id, .. } = curr_actors
655            .get(&actor_id)
656            .ok_or_else(|| anyhow!("BUG: Worker not found for new actor {}", actor_id))?;
657
658        added_actors
659            .entry(*worker_id)
660            .or_insert_with(Vec::new)
661            .push(actor_id);
662    }
663
664    let mut vnode_bitmap_updates = HashMap::new();
665    for actor_id in kept_ids {
666        let prev_actor = &prev_fragment_info.actors[&actor_id];
667        let curr_actor = &curr_actors[&actor_id];
668
669        // Check if the vnode distribution has changed.
670        if prev_actor.vnode_bitmap != curr_actor.vnode_bitmap
671            && let Some(bitmap) = curr_actor.vnode_bitmap.clone()
672        {
673            vnode_bitmap_updates.insert(actor_id, bitmap);
674        }
675    }
676
677    let upstream_dispatcher_mapping =
678        if let DistributionType::Hash = prev_fragment_info.distribution_type {
679            let actor_mapping = curr_actors
680                .iter()
681                .map(
682                    |(
683                        actor_id,
684                        InflightActorInfo {
685                            worker_id: _,
686                            vnode_bitmap,
687                            ..
688                        },
689                    )| { (*actor_id, vnode_bitmap.clone().unwrap()) },
690                )
691                .collect();
692            Some(ActorMapping::from_bitmaps(&actor_mapping))
693        } else {
694            None
695        };
696
697    let upstream_fragment_dispatcher_ids = upstream_fragments
698        .iter()
699        .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
700        .map(|(upstream_fragment, _)| (*upstream_fragment, prev_fragment_info.fragment_id))
701        .collect();
702
703    let downstream_fragment_ids = downstream_fragments
704        .iter()
705        .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
706        .map(|(fragment_id, _)| *fragment_id)
707        .collect();
708
709    let extra_info = job_extra_info.cloned().unwrap_or_default();
710    let expr_context = extra_info.stream_context().to_expr_context();
711    let job_definition = extra_info.job_definition;
712    let config_override = extra_info.config_override;
713
714    let newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)> =
715        added_actor_ids
716            .iter()
717            .map(|actor_id| {
718                let actor = StreamActor {
719                    actor_id: *actor_id,
720                    fragment_id: prev_fragment_info.fragment_id,
721                    vnode_bitmap: curr_actors[actor_id].vnode_bitmap.clone(),
722                    mview_definition: job_definition.clone(),
723                    expr_context: Some(expr_context.clone()),
724                    config_override: config_override.clone(),
725                };
726                (
727                    *actor_id,
728                    (
729                        (
730                            actor,
731                            all_actor_dispatchers
732                                .get(actor_id)
733                                .cloned()
734                                .unwrap_or_default(),
735                        ),
736                        curr_actors[actor_id].worker_id,
737                    ),
738                )
739            })
740            .collect();
741
742    let actor_splits = curr_actors
743        .iter()
744        .map(|(&actor_id, info)| (actor_id, info.splits.clone()))
745        .collect();
746
747    let reschedule = Reschedule {
748        added_actors,
749        removed_actors,
750        vnode_bitmap_updates,
751        upstream_fragment_dispatcher_ids,
752        upstream_dispatcher_mapping,
753        downstream_fragment_ids,
754        actor_splits,
755        newly_created_actors,
756    };
757
758    Ok(reschedule)
759}
760
761/// Build executable reschedule plans from the rendered fragment layout.
762///
763/// Callers are expected to run the no-op layout check before invoking this
764/// function so command construction stays focused on plan materialization.
765pub(crate) fn build_reschedule_commands(
766    render_result: FragmentRenderMap,
767    context: RescheduleContext,
768    all_prev_fragments: HashMap<FragmentId, &InflightFragmentInfo>,
769) -> MetaResult<HashMap<DatabaseId, ReschedulePlan>> {
770    if render_result.is_empty() {
771        return Ok(HashMap::new());
772    }
773
774    let RescheduleContext {
775        job_extra_info,
776        upstream_fragments: mut all_upstream_fragments,
777        downstream_fragments: mut all_downstream_fragments,
778        mut downstream_relations,
779        ..
780    } = context;
781
782    let fragment_ids = render_result
783        .values()
784        .flat_map(|jobs| jobs.values())
785        .flatten()
786        .map(|(fragment_id, _)| *fragment_id)
787        .collect_vec();
788
789    let all_related_fragment_ids: HashSet<_> = fragment_ids
790        .iter()
791        .copied()
792        .chain(all_upstream_fragments.values().flatten().map(|(id, _)| *id))
793        .chain(
794            all_downstream_fragments
795                .values()
796                .flatten()
797                .map(|(id, _)| *id),
798        )
799        .collect();
800
801    for fragment_id in all_related_fragment_ids {
802        if !all_prev_fragments.contains_key(&fragment_id) {
803            return Err(MetaError::from(anyhow!(
804                "previous fragment info for {fragment_id} not found"
805            )));
806        }
807    }
808
809    let all_rendered_fragments: HashMap<_, _> = render_result
810        .values()
811        .flat_map(|jobs| jobs.values())
812        .flatten()
813        .map(|(fragment_id, info)| (*fragment_id, info))
814        .collect();
815
816    let mut commands = HashMap::new();
817
818    for (database_id, jobs) in &render_result {
819        let mut all_fragment_actors = HashMap::new();
820        let mut reschedules = HashMap::new();
821
822        for (job_id, fragment_id, fragment_info) in jobs.iter().flat_map(|(job_id, fragments)| {
823            fragments
824                .iter()
825                .map(move |(fragment_id, info)| (job_id, fragment_id, info))
826        }) {
827            let InflightFragmentInfo {
828                distribution_type,
829                actors,
830                ..
831            } = fragment_info;
832
833            let upstream_fragments = all_upstream_fragments
834                .remove(&(*fragment_id as FragmentId))
835                .unwrap_or_default();
836            let downstream_fragments = all_downstream_fragments
837                .remove(&(*fragment_id as FragmentId))
838                .unwrap_or_default();
839
840            let fragment_actors: HashMap<_, _> = upstream_fragments
841                .keys()
842                .copied()
843                .chain(downstream_fragments.keys().copied())
844                .map(|fragment_id| {
845                    all_prev_fragments
846                        .get(&fragment_id)
847                        .map(|fragment| {
848                            (
849                                fragment_id,
850                                fragment.actors.keys().copied().collect::<HashSet<_>>(),
851                            )
852                        })
853                        .ok_or_else(|| {
854                            MetaError::from(anyhow!(
855                                "fragment {} not found in previous state",
856                                fragment_id
857                            ))
858                        })
859                })
860                .collect::<MetaResult<_>>()?;
861
862            all_fragment_actors.extend(fragment_actors);
863
864            let source_fragment_actors = actors
865                .iter()
866                .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
867                .collect();
868
869            let mut all_actor_dispatchers: HashMap<_, Vec<_>> = HashMap::new();
870
871            for downstream_fragment_id in downstream_fragments.keys() {
872                let target_fragment_actors =
873                    match all_rendered_fragments.get(downstream_fragment_id) {
874                        None => {
875                            let external_fragment = all_prev_fragments
876                                .get(downstream_fragment_id)
877                                .ok_or_else(|| {
878                                    MetaError::from(anyhow!(
879                                        "fragment {} not found in previous state",
880                                        downstream_fragment_id
881                                    ))
882                                })?;
883
884                            external_fragment
885                                .actors
886                                .iter()
887                                .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
888                                .collect()
889                        }
890                        Some(downstream_rendered) => downstream_rendered
891                            .actors
892                            .iter()
893                            .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
894                            .collect(),
895                    };
896
897                let target_fragment_distribution = *distribution_type;
898
899                let fragment_relation::Model {
900                    source_fragment_id: _,
901                    target_fragment_id: _,
902                    dispatcher_type,
903                    dist_key_indices,
904                    output_indices,
905                    output_type_mapping,
906                } = downstream_relations
907                    .remove(&(
908                        *fragment_id as FragmentId,
909                        *downstream_fragment_id as FragmentId,
910                    ))
911                    .ok_or_else(|| {
912                        MetaError::from(anyhow!(
913                            "downstream relation missing for {} -> {}",
914                            fragment_id,
915                            downstream_fragment_id
916                        ))
917                    })?;
918
919                let pb_mapping = PbDispatchOutputMapping {
920                    indices: output_indices.into_u32_array(),
921                    types: output_type_mapping.unwrap_or_default().to_protobuf(),
922                };
923
924                let (dispatchers, _) = compose_dispatchers(
925                    *distribution_type,
926                    &source_fragment_actors,
927                    *downstream_fragment_id,
928                    target_fragment_distribution,
929                    &target_fragment_actors,
930                    dispatcher_type,
931                    dist_key_indices.into_u32_array(),
932                    pb_mapping,
933                );
934
935                for (actor_id, dispatcher) in dispatchers {
936                    all_actor_dispatchers
937                        .entry(actor_id)
938                        .or_default()
939                        .push(dispatcher);
940                }
941            }
942
943            let prev_fragment = all_prev_fragments.get(&{ *fragment_id }).ok_or_else(|| {
944                MetaError::from(anyhow!(
945                    "fragment {} not found in previous state",
946                    fragment_id
947                ))
948            })?;
949
950            let reschedule = diff_fragment(
951                prev_fragment,
952                actors,
953                upstream_fragments,
954                downstream_fragments,
955                all_actor_dispatchers,
956                job_extra_info.get(job_id),
957            )?;
958
959            reschedules.insert(*fragment_id as FragmentId, reschedule);
960        }
961
962        let command = ReschedulePlan {
963            reschedules,
964            fragment_actors: all_fragment_actors,
965        };
966
967        debug_assert!(
968            command
969                .reschedules
970                .values()
971                .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
972            "reschedule plan carries vnode_bitmap_updates, expected full rebuild"
973        );
974
975        commands.insert(*database_id, command);
976    }
977
978    Ok(commands)
979}
980
981#[cfg(test)]
982mod tests {
983    use std::collections::{HashMap, HashSet};
984
985    use risingwave_common::bitmap::Bitmap;
986    use risingwave_common::catalog::FragmentTypeMask;
987    use risingwave_connector::source::SplitImpl;
988    use risingwave_connector::source::test_source::TestSourceSplit;
989
990    use super::*;
991
992    fn actor(worker_id: impl Into<WorkerId>, vnode_bitmap: Option<Bitmap>) -> InflightActorInfo {
993        InflightActorInfo {
994            worker_id: worker_id.into(),
995            vnode_bitmap,
996            splits: vec![],
997        }
998    }
999
1000    fn actor_with_splits(
1001        worker_id: impl Into<WorkerId>,
1002        vnode_bitmap: Option<Bitmap>,
1003        splits: Vec<SplitImpl>,
1004    ) -> InflightActorInfo {
1005        InflightActorInfo {
1006            worker_id: worker_id.into(),
1007            vnode_bitmap,
1008            splits,
1009        }
1010    }
1011
1012    fn test_split(id: &str, offset: &str) -> SplitImpl {
1013        SplitImpl::Test(TestSourceSplit {
1014            id: id.into(),
1015            properties: HashMap::new(),
1016            offset: offset.to_owned(),
1017        })
1018    }
1019
1020    fn fragment(
1021        fragment_id: FragmentId,
1022        actors: impl IntoIterator<Item = (ActorId, InflightActorInfo)>,
1023    ) -> InflightFragmentInfo {
1024        InflightFragmentInfo {
1025            fragment_id,
1026            distribution_type: DistributionType::Hash,
1027            fragment_type_mask: FragmentTypeMask::empty(),
1028            vnode_count: 8,
1029            nodes: Default::default(),
1030            actors: actors.into_iter().collect(),
1031            state_table_ids: HashSet::new(),
1032        }
1033    }
1034
1035    fn render_result(fragments: Vec<(FragmentId, InflightFragmentInfo)>) -> FragmentRenderMap {
1036        HashMap::from([(
1037            DatabaseId::new(1),
1038            HashMap::from([(
1039                JobId::new(1),
1040                fragments.into_iter().collect::<HashMap<_, _>>(),
1041            )]),
1042        )])
1043    }
1044
1045    #[test]
1046    fn rendered_layout_matches_current_ignores_actor_ids_across_fragments() {
1047        let current_source = fragment(
1048            FragmentId::new(1),
1049            [
1050                (
1051                    ActorId::new(101),
1052                    actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1053                ),
1054                (
1055                    ActorId::new(102),
1056                    actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1057                ),
1058            ],
1059        );
1060        let current_target = fragment(
1061            FragmentId::new(2),
1062            [
1063                (
1064                    ActorId::new(201),
1065                    actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1066                ),
1067                (
1068                    ActorId::new(202),
1069                    actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1070                ),
1071            ],
1072        );
1073
1074        let rendered = render_result(vec![
1075            (
1076                FragmentId::new(1),
1077                fragment(
1078                    FragmentId::new(1),
1079                    [
1080                        (
1081                            ActorId::new(1001),
1082                            actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1083                        ),
1084                        (
1085                            ActorId::new(1002),
1086                            actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1087                        ),
1088                    ],
1089                ),
1090            ),
1091            (
1092                FragmentId::new(2),
1093                fragment(
1094                    FragmentId::new(2),
1095                    [
1096                        (
1097                            ActorId::new(2001),
1098                            actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1099                        ),
1100                        (
1101                            ActorId::new(2002),
1102                            actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1103                        ),
1104                    ],
1105                ),
1106            ),
1107        ]);
1108
1109        let prev = HashMap::from([
1110            (FragmentId::new(1), &current_source),
1111            (FragmentId::new(2), &current_target),
1112        ]);
1113
1114        assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1115    }
1116
1117    #[test]
1118    fn rendered_layout_matches_current_detects_target_fragment_layout_changes() {
1119        let current_source = fragment(
1120            FragmentId::new(1),
1121            [
1122                (
1123                    ActorId::new(101),
1124                    actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1125                ),
1126                (
1127                    ActorId::new(102),
1128                    actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1129                ),
1130            ],
1131        );
1132        let current_target = fragment(
1133            FragmentId::new(2),
1134            [
1135                (
1136                    ActorId::new(201),
1137                    actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1138                ),
1139                (
1140                    ActorId::new(202),
1141                    actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1142                ),
1143            ],
1144        );
1145
1146        let rendered = render_result(vec![
1147            (
1148                FragmentId::new(1),
1149                fragment(
1150                    FragmentId::new(1),
1151                    [
1152                        (
1153                            ActorId::new(1001),
1154                            actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1155                        ),
1156                        (
1157                            ActorId::new(1002),
1158                            actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1159                        ),
1160                    ],
1161                ),
1162            ),
1163            (
1164                FragmentId::new(2),
1165                fragment(
1166                    FragmentId::new(2),
1167                    [
1168                        (
1169                            ActorId::new(2001),
1170                            actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3, 4, 5]))),
1171                        ),
1172                        (
1173                            ActorId::new(2002),
1174                            actor(4, Some(Bitmap::from_indices(8, &[6, 7]))),
1175                        ),
1176                    ],
1177                ),
1178            ),
1179        ]);
1180
1181        let prev = HashMap::from([
1182            (FragmentId::new(1), &current_source),
1183            (FragmentId::new(2), &current_target),
1184        ]);
1185
1186        assert!(!rendered_layout_matches_current(&rendered, &prev,).unwrap());
1187    }
1188
1189    #[test]
1190    fn rendered_layout_matches_current_ignores_split_offsets() {
1191        let fragment_id = FragmentId::new(1);
1192        let prev_fragment = fragment(
1193            fragment_id,
1194            [(
1195                ActorId::new(101),
1196                actor_with_splits(
1197                    1,
1198                    Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1199                    vec![test_split("split-0", "100")],
1200                ),
1201            )],
1202        );
1203        let rendered = render_result(vec![(
1204            fragment_id,
1205            fragment(
1206                fragment_id,
1207                [(
1208                    ActorId::new(1001),
1209                    actor_with_splits(
1210                        1,
1211                        Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1212                        vec![test_split("split-0", "200")],
1213                    ),
1214                )],
1215            ),
1216        )]);
1217        let prev = HashMap::from([(fragment_id, &prev_fragment)]);
1218
1219        assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1220    }
1221}
1222
1223#[derive(Clone, Debug, Eq, PartialEq)]
1224pub struct ParallelismPolicy {
1225    pub parallelism: StreamingParallelism,
1226}
1227
1228#[derive(Clone, Debug)]
1229pub struct ResourceGroupPolicy {
1230    pub resource_group: Option<String>,
1231}
1232
1233#[derive(Clone, Debug)]
1234pub enum ReschedulePolicy {
1235    Parallelism(ParallelismPolicy),
1236    ResourceGroup(ResourceGroupPolicy),
1237    Both(ParallelismPolicy, ResourceGroupPolicy),
1238}
1239
1240impl GlobalStreamManager {
1241    #[await_tree::instrument("acquire_reschedule_read_guard")]
1242    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
1243        self.scale_controller.reschedule_lock.read().await
1244    }
1245
1246    #[await_tree::instrument("acquire_reschedule_write_guard")]
1247    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
1248        self.scale_controller.reschedule_lock.write().await
1249    }
1250
1251    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
1252    /// examines if there are any jobs can be scaled, and scales them if found.
1253    ///
1254    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
1255    ///
1256    /// Returns
1257    /// - `Ok(false)` if no jobs can be scaled;
1258    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
1259    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
1260        tracing::info!("trigger parallelism control");
1261
1262        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
1263
1264        let background_streaming_jobs = self
1265            .metadata_manager
1266            .list_background_creating_jobs()
1267            .await?;
1268
1269        let blocked_jobs = self
1270            .metadata_manager
1271            .collect_reschedule_blocked_jobs_for_creating_jobs(&background_streaming_jobs, true)
1272            .await?;
1273        let has_blocked_jobs = !blocked_jobs.is_empty();
1274
1275        let database_objects: HashMap<risingwave_meta_model::DatabaseId, Vec<JobId>> = self
1276            .metadata_manager
1277            .catalog_controller
1278            .list_streaming_job_with_database()
1279            .await?;
1280
1281        let job_ids = database_objects
1282            .iter()
1283            .flat_map(|(database_id, job_ids)| {
1284                job_ids
1285                    .iter()
1286                    .enumerate()
1287                    .map(move |(idx, job_id)| (idx, database_id, job_id))
1288            })
1289            .sorted_by(|(idx_a, database_a, _), (idx_b, database_b, _)| {
1290                idx_a.cmp(idx_b).then(database_a.cmp(database_b))
1291            })
1292            .map(|(_, database_id, job_id)| (*database_id, *job_id))
1293            .filter(|(_, job_id)| !blocked_jobs.contains(job_id))
1294            .collect_vec();
1295
1296        if job_ids.is_empty() {
1297            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
1298            // Retry periodically while some jobs are temporarily blocked by creating
1299            // unreschedulable backfill jobs. This allows us to scale them
1300            // automatically once the creating jobs finish.
1301            return Ok(has_blocked_jobs);
1302        }
1303
1304        let active_workers =
1305            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
1306
1307        tracing::info!(
1308            "trigger parallelism control for jobs: {:#?}, workers {:#?}",
1309            job_ids,
1310            active_workers.current()
1311        );
1312
1313        let batch_size = match self.env.opts.parallelism_control_batch_size {
1314            0 => job_ids.len(),
1315            n => n,
1316        };
1317
1318        tracing::info!(
1319            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
1320            job_ids.len(),
1321            batch_size,
1322            active_workers.current()
1323        );
1324
1325        let batches: Vec<_> = job_ids
1326            .into_iter()
1327            .chunks(batch_size)
1328            .into_iter()
1329            .map(|chunk| chunk.collect_vec())
1330            .collect();
1331
1332        for batch in batches {
1333            let jobs = batch.iter().map(|(_, job_id)| *job_id).collect();
1334
1335            let commands = self.scale_controller.rerender(jobs).await?;
1336
1337            let futures = commands.into_iter().map(|(database_id, command)| {
1338                let barrier_scheduler = self.barrier_scheduler.clone();
1339                async move { barrier_scheduler.run_command(database_id, command).await }
1340            });
1341
1342            let _results = future::try_join_all(futures).await?;
1343        }
1344
1345        Ok(has_blocked_jobs)
1346    }
1347
1348    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
1349    async fn run(&self, mut shutdown_rx: Receiver<()>) {
1350        tracing::info!("starting automatic parallelism control monitor");
1351
1352        let check_period =
1353            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
1354
1355        let mut ticker = tokio::time::interval_at(
1356            Instant::now()
1357                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
1358            check_period,
1359        );
1360        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1361
1362        let (local_notification_tx, mut local_notification_rx) =
1363            tokio::sync::mpsc::unbounded_channel();
1364
1365        self.env
1366            .notification_manager()
1367            .insert_local_sender(local_notification_tx);
1368
1369        // waiting for the first tick
1370        ticker.tick().await;
1371
1372        let worker_nodes = self
1373            .metadata_manager
1374            .list_active_streaming_compute_nodes()
1375            .await
1376            .expect("list active streaming compute nodes");
1377
1378        let mut worker_cache: BTreeMap<_, _> = worker_nodes
1379            .into_iter()
1380            .map(|worker| (worker.id, worker))
1381            .collect();
1382
1383        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
1384
1385        let mut should_trigger = false;
1386
1387        loop {
1388            tokio::select! {
1389                biased;
1390
1391                _ = &mut shutdown_rx => {
1392                    tracing::info!("Stream manager is stopped");
1393                    break;
1394                }
1395
1396                _ = ticker.tick(), if should_trigger => {
1397                    let include_workers = worker_cache.keys().copied().collect_vec();
1398
1399                    if include_workers.is_empty() {
1400                        tracing::debug!("no available worker nodes");
1401                        should_trigger = false;
1402                        continue;
1403                    }
1404
1405                    match self.trigger_parallelism_control().await {
1406                        Ok(cont) => {
1407                            should_trigger = cont;
1408                        }
1409                        Err(e) => {
1410                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
1411                            ticker.reset();
1412                        }
1413                    }
1414                }
1415
1416                notification = local_notification_rx.recv() => {
1417                    let notification = notification.expect("local notification channel closed in loop of stream manager");
1418
1419                    // Only maintain the cache for streaming compute nodes.
1420                    let worker_is_streaming_compute = |worker: &WorkerNode| {
1421                        worker.get_type() == Ok(WorkerType::ComputeNode)
1422                            && worker.property.as_ref().unwrap().is_streaming
1423                    };
1424
1425                    match notification {
1426                        LocalNotification::SystemParamsChange(reader) => {
1427                            let new_strategy = reader.adaptive_parallelism_strategy();
1428                            if new_strategy != previous_adaptive_parallelism_strategy {
1429                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
1430                                should_trigger = true;
1431                                previous_adaptive_parallelism_strategy = new_strategy;
1432                            }
1433                        }
1434                        LocalNotification::WorkerNodeActivated(worker) => {
1435                            if !worker_is_streaming_compute(&worker) {
1436                                continue;
1437                            }
1438
1439                            tracing::info!(worker = %worker.id, "worker activated notification received");
1440
1441                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
1442
1443                            match prev_worker {
1444                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
1445                                    tracing::info!(worker = %worker.id, "worker parallelism changed");
1446                                    should_trigger = true;
1447                                }
1448                                Some(prev_worker) if prev_worker.resource_group() != worker.resource_group()  => {
1449                                    tracing::info!(worker = %worker.id, "worker label changed");
1450                                    should_trigger = true;
1451                                }
1452                                None => {
1453                                    tracing::info!(worker = %worker.id, "new worker joined");
1454                                    should_trigger = true;
1455                                }
1456                                _ => {}
1457                            }
1458                        }
1459
1460                        // Since our logic for handling passive scale-in is within the barrier manager,
1461                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
1462                        LocalNotification::WorkerNodeDeleted(worker) => {
1463                            if !worker_is_streaming_compute(&worker) {
1464                                continue;
1465                            }
1466
1467                            match worker_cache.remove(&worker.id) {
1468                                Some(prev_worker) => {
1469                                    tracing::info!(worker = %prev_worker.id, "worker removed from stream manager cache");
1470                                }
1471                                None => {
1472                                    tracing::warn!(worker = %worker.id, "worker not found in stream manager cache, but it was removed");
1473                                }
1474                            }
1475                        }
1476
1477                        LocalNotification::StreamingJobBackfillFinished(job_id) => {
1478                            tracing::debug!(job_id = %job_id, "received backfill finished notification");
1479                            if let Err(e) = self.apply_post_backfill_parallelism(job_id).await {
1480                                tracing::warn!(job_id = %job_id, error = %e.as_report(), "failed to restore parallelism after backfill");
1481                                // Retry in the next periodic tick. This avoids triggering
1482                                // unnecessary full control passes when restore succeeds.
1483                                should_trigger = true;
1484                            }
1485                        }
1486
1487                        _ => {}
1488                    }
1489                }
1490            }
1491        }
1492    }
1493
1494    /// Restores a streaming job's parallelism to its target value after backfill completes.
1495    async fn apply_post_backfill_parallelism(&self, job_id: JobId) -> MetaResult<()> {
1496        // Fetch both the target parallelism (final desired state) and the backfill parallelism
1497        // (temporary parallelism used during backfill phase) from the catalog.
1498        let Some((target, backfill_parallelism)) = self
1499            .metadata_manager
1500            .catalog_controller
1501            .get_job_parallelisms(job_id)
1502            .await?
1503        else {
1504            // The job may have been dropped before this notification is processed.
1505            // Treat it as a benign no-op so we don't trigger unnecessary retries.
1506            tracing::debug!(
1507                job_id = %job_id,
1508                "streaming job not found when applying post-backfill parallelism, skip"
1509            );
1510            return Ok(());
1511        };
1512
1513        // Determine if we need to reschedule based on the backfill configuration.
1514        match backfill_parallelism {
1515            Some(backfill_parallelism) if backfill_parallelism == target => {
1516                // Backfill parallelism matches target - no reschedule needed since the job
1517                // is already running at the desired parallelism.
1518                tracing::debug!(
1519                    job_id = %job_id,
1520                    ?backfill_parallelism,
1521                    ?target,
1522                    "backfill parallelism equals job parallelism, skip reschedule"
1523                );
1524                return Ok(());
1525            }
1526            Some(_) => {
1527                // Backfill parallelism differs from target - proceed to restore target parallelism.
1528            }
1529            None => {
1530                // No backfill parallelism was configured, meaning the job was created without
1531                // a special backfill override. No reschedule is necessary.
1532                tracing::debug!(
1533                    job_id = %job_id,
1534                    ?target,
1535                    "no backfill parallelism configured, skip post-backfill reschedule"
1536                );
1537                return Ok(());
1538            }
1539        }
1540
1541        // Reschedule the job to restore its target parallelism.
1542        tracing::info!(
1543            job_id = %job_id,
1544            ?target,
1545            ?backfill_parallelism,
1546            "restoring parallelism after backfill via reschedule"
1547        );
1548        let policy = ReschedulePolicy::Parallelism(ParallelismPolicy {
1549            parallelism: target,
1550        });
1551        if let Err(e) = self.reschedule_streaming_job(job_id, policy, false).await {
1552            tracing::warn!(job_id = %job_id, error = %e.as_report(), "reschedule after backfill failed");
1553            return Err(e);
1554        }
1555
1556        tracing::info!(job_id = %job_id, "parallelism reschedule after backfill submitted");
1557        Ok(())
1558    }
1559
1560    pub fn start_auto_parallelism_monitor(
1561        self: Arc<Self>,
1562    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
1563        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
1564        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1565        let join_handle = tokio::spawn(async move {
1566            self.run(shutdown_rx).await;
1567        });
1568
1569        (join_handle, shutdown_tx)
1570    }
1571}