risingwave_meta/stream/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36    FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50    ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53    ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64    pub job_id: u32,
65    pub fragment_id: u32,
66    pub fragment_type_mask: FragmentTypeMask,
67    pub distribution_type: PbFragmentDistributionType,
68    pub state_table_ids: Vec<u32>,
69    pub node: StreamNode,
70    pub actor_template: StreamActorWithDispatchers,
71    pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76    pub actor_id: u32,
77    pub fragment_id: u32,
78    pub dispatcher: Vec<Dispatcher>,
79    /// `None` if singleton.
80    pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use risingwave_common::system_param::AdaptiveParallelismStrategy;
85use risingwave_common::system_param::reader::SystemParamsRead;
86use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
87use risingwave_meta_model::DispatcherType;
88use risingwave_pb::stream_plan::stream_node::NodeBody;
89
90use crate::controller::id::IdCategory;
91use crate::controller::utils::filter_workers_by_resource_group;
92use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
93
94// The debug implementation is arbitrary. Just used in debug logs.
95#[derive(Educe)]
96#[educe(Debug)]
97pub struct RescheduleContext {
98    /// Meta information for all Actors
99    #[educe(Debug(ignore))]
100    actor_map: HashMap<ActorId, CustomActorInfo>,
101    /// Status of all Actors, used to find the location of the `Actor`
102    actor_status: BTreeMap<ActorId, WorkerId>,
103    /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor`
104    #[educe(Debug(ignore))]
105    fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
106    /// Fragments with `StreamSource`
107    stream_source_fragment_ids: HashSet<FragmentId>,
108    /// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment
109    stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
110    /// Target fragments in `NoShuffle` relation
111    no_shuffle_target_fragment_ids: HashSet<FragmentId>,
112    /// Source fragments in `NoShuffle` relation
113    no_shuffle_source_fragment_ids: HashSet<FragmentId>,
114    // index for dispatcher type from upstream fragment to downstream fragment
115    fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
116    fragment_upstreams: HashMap<
117        risingwave_meta_model::FragmentId,
118        HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
119    >,
120}
121
122impl RescheduleContext {
123    fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
124        self.actor_status
125            .get(actor_id)
126            .cloned()
127            .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
128    }
129}
130
131/// This function provides an simple balancing method
132/// The specific process is as follows
133///
134/// 1. Calculate the number of target actors, and calculate the average value and the remainder, and
135///    use the average value as expected.
136///
137/// 2. Filter out the actor to be removed and the actor to be retained, and sort them from largest
138///    to smallest (according to the number of virtual nodes held).
139///
140/// 3. Calculate their balance, 1) For the actors to be removed, the number of virtual nodes per
141///    actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the
142///    balance. 3) For newly created actors, -expected is the balance (always negative).
143///
144/// 4. Allocate the remainder, high priority to newly created nodes.
145///
146/// 5. After that, merge removed, retained and created into a queue, with the head of the queue
147///    being the source, and move the virtual nodes to the destination at the end of the queue.
148///
149/// This can handle scale in, scale out, migration, and simultaneous scaling with as much affinity
150/// as possible.
151///
152/// Note that this function can only rebalance actors whose `vnode_bitmap` is not `None`, in other
153/// words, for `Fragment` of `FragmentDistributionType::Single`, using this function will cause
154/// assert to fail and should be skipped from the upper level.
155///
156/// The return value is the bitmap distribution after scaling, which covers all virtual node indexes
157pub fn rebalance_actor_vnode(
158    actors: &[CustomActorInfo],
159    actors_to_remove: &BTreeSet<ActorId>,
160    actors_to_create: &BTreeSet<ActorId>,
161) -> HashMap<ActorId, Bitmap> {
162    let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
163
164    assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
165    assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
166
167    assert!(actors.len() >= actors_to_remove.len());
168
169    let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
170    assert!(target_actor_count > 0);
171
172    // `vnode_bitmap` must be set on distributed fragments.
173    let vnode_count = actors[0]
174        .vnode_bitmap
175        .as_ref()
176        .expect("vnode bitmap unset")
177        .len();
178
179    // represents the balance of each actor, used to sort later
180    #[derive(Debug)]
181    struct Balance {
182        actor_id: ActorId,
183        balance: i32,
184        builder: BitmapBuilder,
185    }
186    let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
187
188    tracing::debug!(
189        "expected {}, remain {}, prev actors {}, target actors {}",
190        expected,
191        remain,
192        actors.len(),
193        target_actor_count,
194    );
195
196    let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
197        .iter()
198        .map(|actor| {
199            (
200                actor.actor_id as ActorId,
201                actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
202            )
203        })
204        .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
205
206    let order_by_bitmap_desc =
207        |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
208            bitmap_a
209                .count_ones()
210                .cmp(&bitmap_b.count_ones())
211                .reverse()
212                .then(id_a.cmp(id_b))
213        };
214
215    let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
216        let mut builder = BitmapBuilder::default();
217        builder.append_bitmap(bitmap);
218        builder
219    };
220
221    let (prev_expected, _) = vnode_count.div_rem(&actors.len());
222
223    let prev_remain = removed
224        .iter()
225        .map(|(_, bitmap)| {
226            assert!(bitmap.count_ones() >= prev_expected);
227            bitmap.count_ones() - prev_expected
228        })
229        .sum::<usize>();
230
231    removed.sort_by(order_by_bitmap_desc);
232    rest.sort_by(order_by_bitmap_desc);
233
234    let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
235        actor_id,
236        balance: bitmap.count_ones() as i32,
237        builder: builder_from_bitmap(&bitmap),
238    });
239
240    let mut rest_balances = rest
241        .into_iter()
242        .map(|(actor_id, bitmap)| Balance {
243            actor_id,
244            balance: bitmap.count_ones() as i32 - expected as i32,
245            builder: builder_from_bitmap(&bitmap),
246        })
247        .collect_vec();
248
249    let mut created_balances = actors_to_create
250        .iter()
251        .map(|actor_id| Balance {
252            actor_id: *actor_id,
253            balance: -(expected as i32),
254            builder: BitmapBuilder::zeroed(vnode_count),
255        })
256        .collect_vec();
257
258    for balance in created_balances
259        .iter_mut()
260        .rev()
261        .take(prev_remain)
262        .chain(rest_balances.iter_mut())
263    {
264        if remain > 0 {
265            balance.balance -= 1;
266            remain -= 1;
267        }
268    }
269
270    // consume the rest `remain`
271    for balance in &mut created_balances {
272        if remain > 0 {
273            balance.balance -= 1;
274            remain -= 1;
275        }
276    }
277
278    assert_eq!(remain, 0);
279
280    let mut v: VecDeque<_> = removed_balances
281        .chain(rest_balances)
282        .chain(created_balances)
283        .collect();
284
285    // We will return the full bitmap here after rebalancing,
286    // if we want to return only the changed actors, filter balance = 0 here
287    let mut result = HashMap::with_capacity(target_actor_count);
288
289    for balance in &v {
290        tracing::debug!(
291            "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
292            balance.actor_id,
293            balance.balance,
294            actors_to_remove.contains(&balance.actor_id),
295            actors_to_create.contains(&balance.actor_id)
296        );
297    }
298
299    while !v.is_empty() {
300        if v.len() == 1 {
301            let single = v.pop_front().unwrap();
302            assert_eq!(single.balance, 0);
303            if !actors_to_remove.contains(&single.actor_id) {
304                result.insert(single.actor_id, single.builder.finish());
305            }
306
307            continue;
308        }
309
310        let mut src = v.pop_front().unwrap();
311        let mut dst = v.pop_back().unwrap();
312
313        let n = min(abs(src.balance), abs(dst.balance));
314
315        let mut moved = 0;
316        for idx in (0..vnode_count).rev() {
317            if moved >= n {
318                break;
319            }
320
321            if src.builder.is_set(idx) {
322                src.builder.set(idx, false);
323                assert!(!dst.builder.is_set(idx));
324                dst.builder.set(idx, true);
325                moved += 1;
326            }
327        }
328
329        src.balance -= n;
330        dst.balance += n;
331
332        if src.balance != 0 {
333            v.push_front(src);
334        } else if !actors_to_remove.contains(&src.actor_id) {
335            result.insert(src.actor_id, src.builder.finish());
336        }
337
338        if dst.balance != 0 {
339            v.push_back(dst);
340        } else {
341            result.insert(dst.actor_id, dst.builder.finish());
342        }
343    }
344
345    result
346}
347
348#[derive(Debug, Clone, Copy)]
349pub struct RescheduleOptions {
350    /// Whether to resolve the upstream of `NoShuffle` when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree.
351    pub resolve_no_shuffle_upstream: bool,
352
353    /// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created.
354    pub skip_create_new_actors: bool,
355}
356
357pub type ScaleControllerRef = Arc<ScaleController>;
358
359pub struct ScaleController {
360    pub metadata_manager: MetadataManager,
361
362    pub source_manager: SourceManagerRef,
363
364    pub env: MetaSrvEnv,
365
366    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
367    /// e.g., a MV cannot be rescheduled during foreground backfill.
368    pub reschedule_lock: RwLock<()>,
369}
370
371impl ScaleController {
372    pub fn new(
373        metadata_manager: &MetadataManager,
374        source_manager: SourceManagerRef,
375        env: MetaSrvEnv,
376    ) -> Self {
377        Self {
378            metadata_manager: metadata_manager.clone(),
379            source_manager,
380            env,
381            reschedule_lock: RwLock::new(()),
382        }
383    }
384
385    pub async fn integrity_check(&self) -> MetaResult<()> {
386        self.metadata_manager
387            .catalog_controller
388            .integrity_check()
389            .await
390    }
391
392    /// Build the context for rescheduling and do some validation for the request.
393    async fn build_reschedule_context(
394        &self,
395        reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
396        options: RescheduleOptions,
397        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
398    ) -> MetaResult<RescheduleContext> {
399        let worker_nodes: HashMap<WorkerId, WorkerNode> = self
400            .metadata_manager
401            .list_active_streaming_compute_nodes()
402            .await?
403            .into_iter()
404            .map(|worker_node| (worker_node.id as _, worker_node))
405            .collect();
406
407        if worker_nodes.is_empty() {
408            bail!("no available compute node in the cluster");
409        }
410
411        // Check if we are trying to move a fragment to a node marked as unschedulable
412        let unschedulable_worker_ids: HashSet<_> = worker_nodes
413            .values()
414            .filter(|w| {
415                w.property
416                    .as_ref()
417                    .map(|property| property.is_unschedulable)
418                    .unwrap_or(false)
419            })
420            .map(|worker| worker.id as WorkerId)
421            .collect();
422
423        for (fragment_id, reschedule) in &*reschedule {
424            for (worker_id, change) in &reschedule.worker_actor_diff {
425                if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
426                    bail!(
427                        "unable to move fragment {} to unschedulable worker {}",
428                        fragment_id,
429                        worker_id
430                    );
431                }
432            }
433        }
434
435        // FIXME: the same as anther place calling `list_table_fragments` in scaling.
436        // Index for StreamActor
437        let mut actor_map = HashMap::new();
438        // Index for Fragment
439        let mut fragment_map = HashMap::new();
440        // Index for actor status, including actor's worker id
441        let mut actor_status = BTreeMap::new();
442        let mut fragment_state = HashMap::new();
443        let mut fragment_to_table = HashMap::new();
444
445        fn fulfill_index_by_fragment_ids(
446            actor_map: &mut HashMap<u32, CustomActorInfo>,
447            fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
448            actor_status: &mut BTreeMap<ActorId, WorkerId>,
449            fragment_state: &mut HashMap<FragmentId, State>,
450            fragment_to_table: &mut HashMap<FragmentId, TableId>,
451            fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
452            actors: HashMap<ActorId, actor::Model>,
453            mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
454            related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
455        ) {
456            let mut fragment_actors: HashMap<
457                risingwave_meta_model::FragmentId,
458                Vec<CustomActorInfo>,
459            > = HashMap::new();
460
461            let mut expr_contexts = HashMap::new();
462            for (
463                _,
464                actor::Model {
465                    actor_id,
466                    fragment_id,
467                    status: _,
468                    worker_id,
469                    vnode_bitmap,
470                    expr_context,
471                    ..
472                },
473            ) in actors
474            {
475                let dispatchers = actor_dispatchers
476                    .remove(&(actor_id as _))
477                    .unwrap_or_default();
478
479                let actor_info = CustomActorInfo {
480                    actor_id: actor_id as _,
481                    fragment_id: fragment_id as _,
482                    dispatcher: dispatchers,
483                    vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
484                };
485
486                actor_map.insert(actor_id as _, actor_info.clone());
487
488                fragment_actors
489                    .entry(fragment_id as _)
490                    .or_default()
491                    .push(actor_info);
492
493                actor_status.insert(actor_id as _, worker_id as WorkerId);
494
495                expr_contexts.insert(actor_id as u32, expr_context);
496            }
497
498            for (
499                _,
500                fragment::Model {
501                    fragment_id,
502                    job_id,
503                    fragment_type_mask,
504                    distribution_type,
505                    stream_node,
506                    state_table_ids,
507                    ..
508                },
509            ) in fragments
510            {
511                let actors = fragment_actors
512                    .remove(&(fragment_id as _))
513                    .unwrap_or_default();
514
515                let CustomActorInfo {
516                    actor_id,
517                    fragment_id,
518                    dispatcher,
519                    vnode_bitmap,
520                } = actors.first().unwrap().clone();
521
522                let (related_job, job_definition) =
523                    related_jobs.get(&job_id).expect("job not found");
524
525                let fragment = CustomFragmentInfo {
526                    job_id: job_id as _,
527                    fragment_id: fragment_id as _,
528                    fragment_type_mask: fragment_type_mask.into(),
529                    distribution_type: distribution_type.into(),
530                    state_table_ids: state_table_ids.into_u32_array(),
531                    node: stream_node.to_protobuf(),
532                    actor_template: (
533                        StreamActor {
534                            actor_id,
535                            fragment_id: fragment_id as _,
536                            vnode_bitmap,
537                            mview_definition: job_definition.to_owned(),
538                            expr_context: expr_contexts
539                                .get(&actor_id)
540                                .cloned()
541                                .map(|expr_context| expr_context.to_protobuf()),
542                        },
543                        dispatcher,
544                    ),
545                    actors,
546                };
547
548                fragment_map.insert(fragment_id as _, fragment);
549
550                fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
551
552                fragment_state.insert(
553                    fragment_id,
554                    table_fragments::PbState::from(related_job.job_status),
555                );
556            }
557        }
558        let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
559        let working_set = self
560            .metadata_manager
561            .catalog_controller
562            .resolve_working_set_for_reschedule_fragments(fragment_ids)
563            .await?;
564
565        fulfill_index_by_fragment_ids(
566            &mut actor_map,
567            &mut fragment_map,
568            &mut actor_status,
569            &mut fragment_state,
570            &mut fragment_to_table,
571            working_set.fragments,
572            working_set.actors,
573            working_set.actor_dispatchers,
574            working_set.related_jobs,
575        );
576
577        // NoShuffle relation index
578        let mut no_shuffle_source_fragment_ids = HashSet::new();
579        let mut no_shuffle_target_fragment_ids = HashSet::new();
580
581        Self::build_no_shuffle_relation_index(
582            &actor_map,
583            &mut no_shuffle_source_fragment_ids,
584            &mut no_shuffle_target_fragment_ids,
585        );
586
587        if options.resolve_no_shuffle_upstream {
588            let original_reschedule_keys = reschedule.keys().cloned().collect();
589
590            Self::resolve_no_shuffle_upstream_fragments(
591                reschedule,
592                &no_shuffle_source_fragment_ids,
593                &no_shuffle_target_fragment_ids,
594                &working_set.fragment_upstreams,
595            )?;
596
597            if !table_parallelisms.is_empty() {
598                // We need to reiterate through the NO_SHUFFLE dependencies in order to ascertain which downstream table the custom modifications of the table have been propagated from.
599                Self::resolve_no_shuffle_upstream_tables(
600                    original_reschedule_keys,
601                    &no_shuffle_source_fragment_ids,
602                    &no_shuffle_target_fragment_ids,
603                    &fragment_to_table,
604                    &working_set.fragment_upstreams,
605                    table_parallelisms,
606                )?;
607            }
608        }
609
610        let mut fragment_dispatcher_map = HashMap::new();
611        Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
612
613        let mut stream_source_fragment_ids = HashSet::new();
614        let mut stream_source_backfill_fragment_ids = HashMap::new();
615        let mut no_shuffle_reschedule = HashMap::new();
616        for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
617            let fragment = fragment_map
618                .get(fragment_id)
619                .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
620
621            // Check if the rescheduling is supported.
622            match fragment_state[fragment_id] {
623                table_fragments::State::Unspecified => unreachable!(),
624                state @ table_fragments::State::Initial => {
625                    bail!(
626                        "the materialized view of fragment {fragment_id} is in state {}",
627                        state.as_str_name()
628                    )
629                }
630                state @ table_fragments::State::Creating => {
631                    let stream_node = &fragment.node;
632
633                    let mut is_reschedulable = true;
634                    visit_stream_node_cont(stream_node, |body| {
635                        if let Some(NodeBody::StreamScan(node)) = &body.node_body {
636                            if !node.stream_scan_type().is_reschedulable() {
637                                is_reschedulable = false;
638
639                                // fail fast
640                                return false;
641                            }
642
643                            // continue visiting
644                            return true;
645                        }
646
647                        // continue visiting
648                        true
649                    });
650
651                    if !is_reschedulable {
652                        bail!(
653                            "the materialized view of fragment {fragment_id} is in state {}",
654                            state.as_str_name()
655                        )
656                    }
657                }
658                table_fragments::State::Created => {}
659            }
660
661            if no_shuffle_target_fragment_ids.contains(fragment_id) {
662                bail!(
663                    "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
664                );
665            }
666
667            // For the relation of NoShuffle (e.g. Materialize and Chain), we need a special
668            // treatment because the upstream and downstream of NoShuffle are always 1-1
669            // correspondence, so we need to clone the reschedule plan to the downstream of all
670            // cascading relations.
671            if no_shuffle_source_fragment_ids.contains(fragment_id) {
672                // This fragment is a NoShuffle's upstream.
673                let mut queue: VecDeque<_> = fragment_dispatcher_map
674                    .get(fragment_id)
675                    .unwrap()
676                    .keys()
677                    .cloned()
678                    .collect();
679
680                while let Some(downstream_id) = queue.pop_front() {
681                    if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
682                        continue;
683                    }
684
685                    if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
686                    {
687                        let no_shuffle_downstreams = downstream_fragments
688                            .iter()
689                            .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
690                            .map(|(fragment_id, _)| fragment_id);
691
692                        queue.extend(no_shuffle_downstreams.copied());
693                    }
694
695                    no_shuffle_reschedule.insert(
696                        downstream_id,
697                        WorkerReschedule {
698                            worker_actor_diff: worker_actor_diff.clone(),
699                        },
700                    );
701                }
702            }
703
704            if fragment
705                .fragment_type_mask
706                .contains(FragmentTypeFlag::Source)
707                && fragment.node.find_stream_source().is_some()
708            {
709                stream_source_fragment_ids.insert(*fragment_id);
710            }
711
712            // Check if the reschedule plan is valid.
713            let current_worker_ids = fragment
714                .actors
715                .iter()
716                .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
717                .collect::<HashSet<_>>();
718
719            for (removed, change) in worker_actor_diff {
720                if !current_worker_ids.contains(removed) && change.is_negative() {
721                    bail!(
722                        "no actor on the worker {} of fragment {}",
723                        removed,
724                        fragment_id
725                    );
726                }
727            }
728
729            let added_actor_count: usize = worker_actor_diff
730                .values()
731                .filter(|change| change.is_positive())
732                .cloned()
733                .map(|change| change as usize)
734                .sum();
735
736            let removed_actor_count: usize = worker_actor_diff
737                .values()
738                .filter(|change| change.is_positive())
739                .cloned()
740                .map(|v| v.unsigned_abs())
741                .sum();
742
743            match fragment.distribution_type {
744                FragmentDistributionType::Hash => {
745                    if fragment.actors.len() + added_actor_count <= removed_actor_count {
746                        bail!("can't remove all actors from fragment {}", fragment_id);
747                    }
748                }
749                FragmentDistributionType::Single => {
750                    if added_actor_count != removed_actor_count {
751                        bail!("single distribution fragment only support migration");
752                    }
753                }
754                FragmentDistributionType::Unspecified => unreachable!(),
755            }
756        }
757
758        if !no_shuffle_reschedule.is_empty() {
759            tracing::info!(
760                "reschedule plan rewritten with NoShuffle reschedule {:?}",
761                no_shuffle_reschedule
762            );
763
764            for noshuffle_downstream in no_shuffle_reschedule.keys() {
765                let fragment = fragment_map.get(noshuffle_downstream).unwrap();
766                // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
767                if fragment
768                    .fragment_type_mask
769                    .contains(FragmentTypeFlag::SourceScan)
770                {
771                    let stream_node = &fragment.node;
772                    if let Some((_source_id, upstream_source_fragment_id)) =
773                        stream_node.find_source_backfill()
774                    {
775                        stream_source_backfill_fragment_ids
776                            .insert(fragment.fragment_id, upstream_source_fragment_id);
777                    }
778                }
779            }
780        }
781
782        // Modifications for NoShuffle downstream.
783        reschedule.extend(no_shuffle_reschedule.into_iter());
784
785        Ok(RescheduleContext {
786            actor_map,
787            actor_status,
788            fragment_map,
789            stream_source_fragment_ids,
790            stream_source_backfill_fragment_ids,
791            no_shuffle_target_fragment_ids,
792            no_shuffle_source_fragment_ids,
793            fragment_dispatcher_map,
794            fragment_upstreams: working_set.fragment_upstreams,
795        })
796    }
797
798    /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`].
799    ///
800    /// Returns `(reschedule_fragment, applied_reschedules)`
801    /// - `reschedule_fragment`: the generated reschedule plan
802    /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1).
803    ///
804    /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors`), we use the returned values to
805    /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling.
806    /// Meta store is updated after the barrier is collected.
807    ///
808    /// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta.
809    pub(crate) async fn analyze_reschedule_plan(
810        &self,
811        mut reschedules: HashMap<FragmentId, WorkerReschedule>,
812        options: RescheduleOptions,
813        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
814    ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
815        tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
816        let ctx = self
817            .build_reschedule_context(&mut reschedules, options, table_parallelisms)
818            .await?;
819        tracing::debug!("reschedule context: {:#?}", ctx);
820        let reschedules = reschedules;
821
822        // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.
823
824        // Index of actors to create/remove
825        // Fragment Id => ( Actor Id => Worker Id )
826        let (fragment_actors_to_remove, fragment_actors_to_create) =
827            self.arrange_reschedules(&reschedules, &ctx)?;
828
829        let mut fragment_actor_bitmap = HashMap::new();
830        for fragment_id in reschedules.keys() {
831            if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
832                // skipping chain fragment, we need to clone the upstream materialize fragment's
833                // mapping later
834                continue;
835            }
836
837            let actors_to_create = fragment_actors_to_create
838                .get(fragment_id)
839                .map(|map| map.keys().copied().collect())
840                .unwrap_or_default();
841
842            let actors_to_remove = fragment_actors_to_remove
843                .get(fragment_id)
844                .map(|map| map.keys().copied().collect())
845                .unwrap_or_default();
846
847            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
848
849            match fragment.distribution_type {
850                FragmentDistributionType::Single => {
851                    // Skip re-balancing action for single distribution (always None)
852                    fragment_actor_bitmap
853                        .insert(fragment.fragment_id as FragmentId, Default::default());
854                }
855                FragmentDistributionType::Hash => {
856                    let actor_vnode = rebalance_actor_vnode(
857                        &fragment.actors,
858                        &actors_to_remove,
859                        &actors_to_create,
860                    );
861
862                    fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
863                }
864
865                FragmentDistributionType::Unspecified => unreachable!(),
866            }
867        }
868
869        // Index for fragment -> { actor -> worker_id } after reschedule.
870        // Since we need to organize the upstream and downstream relationships of NoShuffle,
871        // we need to organize the actor distribution after a scaling.
872        let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
873        for fragment_id in reschedules.keys() {
874            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
875            let mut new_actor_ids = BTreeMap::new();
876            for actor in &fragment.actors {
877                if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
878                    && actors_to_remove.contains_key(&actor.actor_id)
879                {
880                    continue;
881                }
882                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
883                new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
884            }
885
886            if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
887                for (actor_id, worker_id) in actors_to_create {
888                    new_actor_ids.insert(*actor_id, *worker_id);
889                }
890            }
891
892            assert!(
893                !new_actor_ids.is_empty(),
894                "should be at least one actor in fragment {} after rescheduling",
895                fragment_id
896            );
897
898            fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
899        }
900
901        let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
902
903        // In order to maintain consistency with the original structure, the upstream and downstream
904        // actors of NoShuffle need to be in the same worker slot and hold the same virtual nodes,
905        // so for the actors after the upstream re-balancing, since we have sorted the actors of the same fragment by id on all workers,
906        // we can identify the corresponding upstream actor with NO_SHUFFLE.
907        // NOTE: There should be more asserts here to ensure correctness.
908        fn arrange_no_shuffle_relation(
909            ctx: &RescheduleContext,
910            fragment_id: &FragmentId,
911            upstream_fragment_id: &FragmentId,
912            fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
913            actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
914            fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
915            no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
916            no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
917        ) {
918            if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
919                return;
920            }
921
922            let fragment = &ctx.fragment_map[fragment_id];
923
924            let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
925
926            // build actor group map
927            for upstream_actor in &upstream_fragment.actors {
928                for dispatcher in &upstream_actor.dispatcher {
929                    if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
930                        let downstream_actor_id =
931                            *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
932
933                        // upstream is root
934                        if !ctx
935                            .no_shuffle_target_fragment_ids
936                            .contains(upstream_fragment_id)
937                        {
938                            actor_group_map.insert(
939                                upstream_actor.actor_id,
940                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
941                            );
942                            actor_group_map.insert(
943                                downstream_actor_id,
944                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
945                            );
946                        } else {
947                            let root_actor_id = actor_group_map[&upstream_actor.actor_id];
948
949                            actor_group_map.insert(downstream_actor_id, root_actor_id);
950                        }
951                    }
952                }
953            }
954
955            // If the upstream is a Singleton Fragment, there will be no Bitmap changes
956            let upstream_fragment_bitmap = fragment_updated_bitmap
957                .get(upstream_fragment_id)
958                .cloned()
959                .unwrap_or_default();
960
961            // Question: Is it possible to have Hash Distribution Fragment but the Actor's bitmap remains unchanged?
962            if upstream_fragment.distribution_type == FragmentDistributionType::Single {
963                assert!(
964                    upstream_fragment_bitmap.is_empty(),
965                    "single fragment should have no bitmap updates"
966                );
967            }
968
969            let upstream_fragment_actor_map = fragment_actors_after_reschedule
970                .get(upstream_fragment_id)
971                .cloned()
972                .unwrap();
973
974            let fragment_actor_map = fragment_actors_after_reschedule
975                .get(fragment_id)
976                .cloned()
977                .unwrap();
978
979            let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
980
981            // first, find existing actor bitmap, copy them
982            let mut fragment_bitmap = HashMap::new();
983
984            for (actor_id, worker_id) in &fragment_actor_map {
985                if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
986                    let root_bitmap = fragment_updated_bitmap
987                        .get(root_fragment)
988                        .expect("root fragment bitmap not found")
989                        .get(root_actor_id)
990                        .cloned()
991                        .expect("root actor bitmap not found");
992
993                    // Copy the bitmap
994                    fragment_bitmap.insert(*actor_id, root_bitmap);
995
996                    no_shuffle_upstream_actor_map
997                        .entry(*actor_id as ActorId)
998                        .or_default()
999                        .insert(*upstream_fragment_id, *root_actor_id);
1000                    no_shuffle_downstream_actors_map
1001                        .entry(*root_actor_id)
1002                        .or_default()
1003                        .insert(*fragment_id, *actor_id);
1004                } else {
1005                    worker_reverse_index
1006                        .entry(*worker_id)
1007                        .or_default()
1008                        .insert(*actor_id);
1009                }
1010            }
1011
1012            let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1013
1014            for (actor_id, worker_id) in &upstream_fragment_actor_map {
1015                if !actor_group_map.contains_key(actor_id) {
1016                    upstream_worker_reverse_index
1017                        .entry(*worker_id)
1018                        .or_default()
1019                        .insert(*actor_id);
1020                }
1021            }
1022
1023            // then, find the rest of the actors and copy the bitmap
1024            for (worker_id, actor_ids) in worker_reverse_index {
1025                let upstream_actor_ids = upstream_worker_reverse_index
1026                    .get(&worker_id)
1027                    .unwrap()
1028                    .clone();
1029
1030                assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1031
1032                for (actor_id, upstream_actor_id) in actor_ids
1033                    .into_iter()
1034                    .zip_eq_debug(upstream_actor_ids.into_iter())
1035                {
1036                    match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1037                        None => {
1038                            // single fragment should have no bitmap updates (same as upstream)
1039                            assert_eq!(
1040                                upstream_fragment.distribution_type,
1041                                FragmentDistributionType::Single
1042                            );
1043                        }
1044                        Some(bitmap) => {
1045                            // Copy the bitmap
1046                            fragment_bitmap.insert(actor_id, bitmap);
1047                        }
1048                    }
1049
1050                    no_shuffle_upstream_actor_map
1051                        .entry(actor_id as ActorId)
1052                        .or_default()
1053                        .insert(*upstream_fragment_id, upstream_actor_id);
1054                    no_shuffle_downstream_actors_map
1055                        .entry(upstream_actor_id)
1056                        .or_default()
1057                        .insert(*fragment_id, actor_id);
1058                }
1059            }
1060
1061            match fragment.distribution_type {
1062                FragmentDistributionType::Hash => {}
1063                FragmentDistributionType::Single => {
1064                    // single distribution should update nothing
1065                    assert!(fragment_bitmap.is_empty());
1066                }
1067                FragmentDistributionType::Unspecified => unreachable!(),
1068            }
1069
1070            if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1071                assert_eq!(
1072                    e.entry.get(),
1073                    &e.value,
1074                    "bitmaps derived from different no-shuffle upstreams mismatch"
1075                );
1076            }
1077
1078            // Visit downstream fragments recursively.
1079            if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1080                let no_shuffle_downstreams = downstream_fragments
1081                    .iter()
1082                    .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1083                    .map(|(fragment_id, _)| fragment_id);
1084
1085                for downstream_fragment_id in no_shuffle_downstreams {
1086                    arrange_no_shuffle_relation(
1087                        ctx,
1088                        downstream_fragment_id,
1089                        fragment_id,
1090                        fragment_actors_after_reschedule,
1091                        actor_group_map,
1092                        fragment_updated_bitmap,
1093                        no_shuffle_upstream_actor_map,
1094                        no_shuffle_downstream_actors_map,
1095                    );
1096                }
1097            }
1098        }
1099
1100        let mut no_shuffle_upstream_actor_map = HashMap::new();
1101        let mut no_shuffle_downstream_actors_map = HashMap::new();
1102        let mut actor_group_map = HashMap::new();
1103        // For all roots in the upstream and downstream dependency trees of NoShuffle, recursively
1104        // find all correspondences
1105        for fragment_id in reschedules.keys() {
1106            if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1107                && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1108                && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1109            {
1110                for downstream_fragment_id in downstream_fragments.keys() {
1111                    arrange_no_shuffle_relation(
1112                        &ctx,
1113                        downstream_fragment_id,
1114                        fragment_id,
1115                        &fragment_actors_after_reschedule,
1116                        &mut actor_group_map,
1117                        &mut fragment_actor_bitmap,
1118                        &mut no_shuffle_upstream_actor_map,
1119                        &mut no_shuffle_downstream_actors_map,
1120                    );
1121                }
1122            }
1123        }
1124
1125        tracing::debug!("actor group map {:?}", actor_group_map);
1126
1127        let mut new_created_actors = HashMap::new();
1128        for fragment_id in reschedules.keys() {
1129            let actors_to_create = fragment_actors_to_create
1130                .get(fragment_id)
1131                .cloned()
1132                .unwrap_or_default();
1133
1134            let fragment = &ctx.fragment_map[fragment_id];
1135
1136            assert!(!fragment.actors.is_empty());
1137
1138            for actor_to_create in &actors_to_create {
1139                let new_actor_id = actor_to_create.0;
1140                let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1141
1142                // This should be assigned before the `modify_actor_upstream_and_downstream` call,
1143                // because we need to use the new actor id to find the upstream and
1144                // downstream in the NoShuffle relationship
1145                new_actor.actor_id = *new_actor_id;
1146
1147                Self::modify_actor_upstream_and_downstream(
1148                    &ctx,
1149                    &fragment_actors_to_remove,
1150                    &fragment_actors_to_create,
1151                    &fragment_actor_bitmap,
1152                    &no_shuffle_downstream_actors_map,
1153                    &mut new_actor,
1154                    &mut dispatchers,
1155                )?;
1156
1157                if let Some(bitmap) = fragment_actor_bitmap
1158                    .get(fragment_id)
1159                    .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1160                {
1161                    new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1162                }
1163
1164                new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1165            }
1166        }
1167
1168        // For stream source & source backfill fragments, we need to reallocate the splits.
1169        // Because we are in the Pause state, so it's no problem to reallocate
1170        let mut fragment_actor_splits = HashMap::new();
1171        for fragment_id in reschedules.keys() {
1172            let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1173
1174            if ctx.stream_source_fragment_ids.contains(fragment_id) {
1175                let fragment = &ctx.fragment_map[fragment_id];
1176
1177                let prev_actor_ids = fragment
1178                    .actors
1179                    .iter()
1180                    .map(|actor| actor.actor_id)
1181                    .collect_vec();
1182
1183                let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1184
1185                let actor_splits = self
1186                    .env
1187                    .shared_actor_infos()
1188                    .migrate_splits_for_source_actors(
1189                        *fragment_id,
1190                        &prev_actor_ids,
1191                        &curr_actor_ids,
1192                    )?;
1193
1194                tracing::debug!(
1195                    "source actor splits: {:?}, fragment_id: {}",
1196                    actor_splits,
1197                    fragment_id
1198                );
1199                fragment_actor_splits.insert(*fragment_id, actor_splits);
1200            }
1201        }
1202        // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
1203        if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1204            for fragment_id in reschedules.keys() {
1205                let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1206
1207                if let Some(upstream_source_fragment_id) =
1208                    ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1209                {
1210                    let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1211
1212                    let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1213                        *fragment_id,
1214                        *upstream_source_fragment_id,
1215                        &curr_actor_ids,
1216                        &fragment_actor_splits,
1217                        &no_shuffle_upstream_actor_map,
1218                    )?;
1219                    tracing::debug!(
1220                        "source backfill actor splits: {:?}, fragment_id: {}",
1221                        actor_splits,
1222                        fragment_id
1223                    );
1224                    fragment_actor_splits.insert(*fragment_id, actor_splits);
1225                }
1226            }
1227        }
1228
1229        // Generate fragment reschedule plan
1230        let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1231            HashMap::with_capacity(reschedules.len());
1232
1233        for (fragment_id, _) in reschedules {
1234            let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1235
1236            if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1237                for (actor_id, worker_id) in actor_worker_maps {
1238                    actors_to_create
1239                        .entry(worker_id)
1240                        .or_default()
1241                        .push(actor_id);
1242                }
1243            }
1244
1245            let actors_to_remove = fragment_actors_to_remove
1246                .get(&fragment_id)
1247                .cloned()
1248                .unwrap_or_default()
1249                .into_keys()
1250                .collect();
1251
1252            let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1253
1254            assert!(!actors_after_reschedule.is_empty());
1255
1256            let fragment = &ctx.fragment_map[&fragment_id];
1257
1258            let in_degree_types: HashSet<_> = ctx
1259                .fragment_upstreams
1260                .get(&(fragment_id as _))
1261                .map(|upstreams| upstreams.values())
1262                .into_iter()
1263                .flatten()
1264                .cloned()
1265                .collect();
1266
1267            let upstream_dispatcher_mapping = match fragment.distribution_type {
1268                FragmentDistributionType::Hash => {
1269                    if !in_degree_types.contains(&DispatcherType::Hash) {
1270                        None
1271                    } else {
1272                        // Changes of the bitmap must occur in the case of HashDistribution
1273                        Some(ActorMapping::from_bitmaps(
1274                            &fragment_actor_bitmap[&fragment_id],
1275                        ))
1276                    }
1277                }
1278
1279                FragmentDistributionType::Single => {
1280                    assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1281                    None
1282                }
1283                FragmentDistributionType::Unspecified => unreachable!(),
1284            };
1285
1286            let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1287
1288            {
1289                if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1290                    for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1291                        match upstream_dispatcher_type {
1292                            DispatcherType::NoShuffle => {}
1293                            _ => {
1294                                upstream_fragment_dispatcher_set.insert((
1295                                    *upstream_fragment_id as FragmentId,
1296                                    fragment.fragment_id as DispatcherId,
1297                                ));
1298                            }
1299                        }
1300                    }
1301                }
1302            }
1303
1304            let downstream_fragment_ids = if let Some(downstream_fragments) =
1305                ctx.fragment_dispatcher_map.get(&fragment_id)
1306            {
1307                // Skip fragments' no-shuffle downstream, as there's no need to update the merger
1308                // (receiver) of a no-shuffle downstream
1309                downstream_fragments
1310                    .iter()
1311                    .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1312                    .map(|(fragment_id, _)| *fragment_id)
1313                    .collect_vec()
1314            } else {
1315                vec![]
1316            };
1317
1318            let vnode_bitmap_updates = match fragment.distribution_type {
1319                FragmentDistributionType::Hash => {
1320                    let mut vnode_bitmap_updates =
1321                        fragment_actor_bitmap.remove(&fragment_id).unwrap();
1322
1323                    // We need to keep the bitmaps from changed actors only,
1324                    // otherwise the barrier will become very large with many actors
1325                    for actor_id in actors_after_reschedule.keys() {
1326                        assert!(vnode_bitmap_updates.contains_key(actor_id));
1327
1328                        // retain actor
1329                        if let Some(actor) = ctx.actor_map.get(actor_id) {
1330                            let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1331
1332                            if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1333                                && prev_bitmap.eq(bitmap)
1334                            {
1335                                vnode_bitmap_updates.remove(actor_id);
1336                            }
1337                        }
1338                    }
1339
1340                    vnode_bitmap_updates
1341                }
1342                FragmentDistributionType::Single => HashMap::new(),
1343                FragmentDistributionType::Unspecified => unreachable!(),
1344            };
1345
1346            let upstream_fragment_dispatcher_ids =
1347                upstream_fragment_dispatcher_set.into_iter().collect_vec();
1348
1349            let actor_splits = fragment_actor_splits
1350                .get(&fragment_id)
1351                .cloned()
1352                .unwrap_or_default();
1353
1354            let mut cdc_table_id = None;
1355            let cdc_table_snapshot_split_assignment = if fragment
1356                .fragment_type_mask
1357                .contains(FragmentTypeFlag::StreamCdcScan)
1358            {
1359                cdc_table_id = Some(fragment.job_id);
1360                assign_cdc_table_snapshot_splits_impl(
1361                    fragment.job_id,
1362                    fragment_actors_after_reschedule
1363                        .get(&fragment_id)
1364                        .unwrap()
1365                        .keys()
1366                        .copied()
1367                        .collect(),
1368                    self.env.meta_store_ref(),
1369                    None,
1370                )
1371                .await?
1372            } else {
1373                HashMap::default()
1374            };
1375
1376            reschedule_fragment.insert(
1377                fragment_id,
1378                Reschedule {
1379                    added_actors: actors_to_create,
1380                    removed_actors: actors_to_remove,
1381                    vnode_bitmap_updates,
1382                    upstream_fragment_dispatcher_ids,
1383                    upstream_dispatcher_mapping,
1384                    downstream_fragment_ids,
1385                    actor_splits,
1386                    newly_created_actors: Default::default(),
1387                    cdc_table_snapshot_split_assignment,
1388                    cdc_table_id,
1389                },
1390            );
1391        }
1392
1393        let mut fragment_created_actors = HashMap::new();
1394        for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1395            let mut created_actors = HashMap::new();
1396            for (actor_id, worker_id) in actors_to_create {
1397                let actor = new_created_actors.get(actor_id).cloned().unwrap();
1398                created_actors.insert(*actor_id, (actor, *worker_id));
1399            }
1400
1401            fragment_created_actors.insert(*fragment_id, created_actors);
1402        }
1403
1404        for (fragment_id, to_create) in fragment_created_actors {
1405            let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1406            reschedule.newly_created_actors = to_create;
1407        }
1408        tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1409
1410        Ok(reschedule_fragment)
1411    }
1412
1413    #[expect(clippy::type_complexity)]
1414    fn arrange_reschedules(
1415        &self,
1416        reschedule: &HashMap<FragmentId, WorkerReschedule>,
1417        ctx: &RescheduleContext,
1418    ) -> MetaResult<(
1419        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1420        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1421    )> {
1422        let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1423        let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1424
1425        for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1426            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1427
1428            // Actor Id => Worker Id
1429            let mut actors_to_remove = BTreeMap::new();
1430            let mut actors_to_create = BTreeMap::new();
1431
1432            // NOTE(important): The value needs to be a BTreeSet to ensure that the actors on the worker are sorted in ascending order.
1433            let mut worker_to_actors = HashMap::new();
1434
1435            for actor in &fragment.actors {
1436                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1437                worker_to_actors
1438                    .entry(worker_id)
1439                    .or_insert(BTreeSet::new())
1440                    .insert(actor.actor_id as ActorId);
1441            }
1442
1443            let decreased_actor_count = worker_actor_diff
1444                .iter()
1445                .filter(|(_, change)| change.is_negative())
1446                .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1447
1448            for (worker_id, n) in decreased_actor_count {
1449                if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1450                    if actor_ids.len() < n {
1451                        bail!(
1452                            "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1453                            fragment_id,
1454                            worker_id,
1455                            actor_ids.len(),
1456                            n
1457                        );
1458                    }
1459
1460                    let removed_actors: Vec<_> = actor_ids
1461                        .iter()
1462                        .skip(actor_ids.len().saturating_sub(n))
1463                        .cloned()
1464                        .collect();
1465
1466                    for actor in removed_actors {
1467                        actors_to_remove.insert(actor, *worker_id);
1468                    }
1469                }
1470            }
1471
1472            let increased_actor_count = worker_actor_diff
1473                .iter()
1474                .filter(|(_, change)| change.is_positive());
1475
1476            for (worker, n) in increased_actor_count {
1477                for _ in 0..*n {
1478                    let id = self
1479                        .env
1480                        .id_gen_manager()
1481                        .generate_interval::<{ IdCategory::Actor }>(1)
1482                        as ActorId;
1483                    actors_to_create.insert(id, *worker);
1484                }
1485            }
1486
1487            if !actors_to_remove.is_empty() {
1488                fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1489            }
1490
1491            if !actors_to_create.is_empty() {
1492                fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1493            }
1494        }
1495
1496        // sanity checking
1497        for actors_to_remove in fragment_actors_to_remove.values() {
1498            for actor_id in actors_to_remove.keys() {
1499                let actor = ctx.actor_map.get(actor_id).unwrap();
1500                for dispatcher in &actor.dispatcher {
1501                    if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1502                        let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1503
1504                        let _should_exists = fragment_actors_to_remove
1505                            .get(&(dispatcher.dispatcher_id as FragmentId))
1506                            .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1507                            .get(downstream_actor_id)
1508                            .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1509                    }
1510                }
1511            }
1512        }
1513
1514        Ok((fragment_actors_to_remove, fragment_actors_to_create))
1515    }
1516
1517    /// Modifies the upstream and downstream actors of the new created actor according to the
1518    /// overall changes, and is used to handle cascading updates
1519    fn modify_actor_upstream_and_downstream(
1520        ctx: &RescheduleContext,
1521        fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1522        fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1523        fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1524        no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1525        new_actor: &mut StreamActor,
1526        dispatchers: &mut Vec<PbDispatcher>,
1527    ) -> MetaResult<()> {
1528        // Update downstream actor ids
1529        for dispatcher in dispatchers {
1530            let downstream_fragment_id = dispatcher
1531                .downstream_actor_id
1532                .iter()
1533                .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1534                .dedup()
1535                .exactly_one()
1536                .unwrap() as FragmentId;
1537
1538            let downstream_fragment_actors_to_remove =
1539                fragment_actors_to_remove.get(&downstream_fragment_id);
1540            let downstream_fragment_actors_to_create =
1541                fragment_actors_to_create.get(&downstream_fragment_id);
1542
1543            match dispatcher.r#type() {
1544                d @ (PbDispatcherType::Hash
1545                | PbDispatcherType::Simple
1546                | PbDispatcherType::Broadcast) => {
1547                    if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1548                    {
1549                        dispatcher
1550                            .downstream_actor_id
1551                            .retain(|id| !downstream_actors_to_remove.contains_key(id));
1552                    }
1553
1554                    if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1555                    {
1556                        dispatcher
1557                            .downstream_actor_id
1558                            .extend(downstream_actors_to_create.keys().cloned())
1559                    }
1560
1561                    // There should be still exactly one downstream actor
1562                    if d == PbDispatcherType::Simple {
1563                        assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1564                    }
1565                }
1566                PbDispatcherType::NoShuffle => {
1567                    assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1568                    let downstream_actor_id = no_shuffle_downstream_actors_map
1569                        .get(&new_actor.actor_id)
1570                        .and_then(|map| map.get(&downstream_fragment_id))
1571                        .unwrap();
1572                    dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1573                }
1574                PbDispatcherType::Unspecified => unreachable!(),
1575            }
1576
1577            if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1578                && let Some(downstream_updated_bitmap) =
1579                    fragment_actor_bitmap.get(&downstream_fragment_id)
1580            {
1581                // If downstream scale in/out
1582                *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1583            }
1584        }
1585
1586        Ok(())
1587    }
1588
1589    #[await_tree::instrument]
1590    pub async fn post_apply_reschedule(
1591        &self,
1592        reschedules: &HashMap<FragmentId, Reschedule>,
1593        post_updates: &JobReschedulePostUpdates,
1594    ) -> MetaResult<()> {
1595        // Update fragment info after rescheduling in meta store.
1596        self.metadata_manager
1597            .post_apply_reschedules(reschedules.clone(), post_updates)
1598            .await?;
1599
1600        // Update serving fragment info after rescheduling in meta store.
1601        if !reschedules.is_empty() {
1602            let workers = self
1603                .metadata_manager
1604                .list_active_serving_compute_nodes()
1605                .await?;
1606            let streaming_parallelisms = self
1607                .metadata_manager
1608                .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))?;
1609            let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1610            let max_serving_parallelism = self
1611                .env
1612                .session_params_manager_impl_ref()
1613                .get_params()
1614                .await
1615                .batch_parallelism()
1616                .map(|p| p.get());
1617            let (upserted, failed) = serving_worker_slot_mapping.upsert(
1618                streaming_parallelisms,
1619                &workers,
1620                max_serving_parallelism,
1621            );
1622            if !upserted.is_empty() {
1623                tracing::debug!(
1624                    "Update serving vnode mapping for fragments {:?}.",
1625                    upserted.keys()
1626                );
1627                self.env
1628                    .notification_manager()
1629                    .notify_frontend_without_version(
1630                        Operation::Update,
1631                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1632                            mappings: to_fragment_worker_slot_mapping(&upserted),
1633                        }),
1634                    );
1635            }
1636            if !failed.is_empty() {
1637                tracing::debug!(
1638                    "Fail to update serving vnode mapping for fragments {:?}.",
1639                    failed
1640                );
1641                self.env
1642                    .notification_manager()
1643                    .notify_frontend_without_version(
1644                        Operation::Delete,
1645                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1646                            mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1647                        }),
1648                    );
1649            }
1650        }
1651
1652        Ok(())
1653    }
1654
1655    pub async fn generate_job_reschedule_plan(
1656        &self,
1657        policy: JobReschedulePolicy,
1658        generate_plan_for_cdc_table_backfill: bool,
1659    ) -> MetaResult<JobReschedulePlan> {
1660        type VnodeCount = usize;
1661
1662        let JobReschedulePolicy { targets } = policy;
1663
1664        let workers = self
1665            .metadata_manager
1666            .list_active_streaming_compute_nodes()
1667            .await?;
1668
1669        // The `schedulable` field should eventually be replaced by resource groups like `unschedulable`
1670        let workers: HashMap<_, _> = workers
1671            .into_iter()
1672            .filter(|worker| worker.is_streaming_schedulable())
1673            .map(|worker| (worker.id, worker))
1674            .collect();
1675
1676        #[derive(Debug)]
1677        struct JobUpdate {
1678            filtered_worker_ids: BTreeSet<WorkerId>,
1679            parallelism: TableParallelism,
1680        }
1681
1682        let mut job_parallelism_updates = HashMap::new();
1683
1684        let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1685            parallelism_updates: Default::default(),
1686            resource_group_updates: Default::default(),
1687        };
1688
1689        for (
1690            job_id,
1691            JobRescheduleTarget {
1692                parallelism: parallelism_update,
1693                resource_group: resource_group_update,
1694            },
1695        ) in &targets
1696        {
1697            let parallelism = match parallelism_update {
1698                JobParallelismTarget::Update(parallelism) => *parallelism,
1699                JobParallelismTarget::Refresh => {
1700                    let parallelism = self
1701                        .metadata_manager
1702                        .catalog_controller
1703                        .get_job_streaming_parallelisms(*job_id as _)
1704                        .await?;
1705
1706                    parallelism.into()
1707                }
1708            };
1709
1710            job_reschedule_post_updates
1711                .parallelism_updates
1712                .insert(TableId::from(*job_id), parallelism);
1713
1714            let current_resource_group = match resource_group_update {
1715                JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1716                    job_reschedule_post_updates.resource_group_updates.insert(
1717                        *job_id as ObjectId,
1718                        Some(specific_resource_group.to_owned()),
1719                    );
1720
1721                    specific_resource_group.to_owned()
1722                }
1723                JobResourceGroupTarget::Update(None) => {
1724                    let database_resource_group = self
1725                        .metadata_manager
1726                        .catalog_controller
1727                        .get_existing_job_database_resource_group(*job_id as _)
1728                        .await?;
1729
1730                    job_reschedule_post_updates
1731                        .resource_group_updates
1732                        .insert(*job_id as ObjectId, None);
1733                    database_resource_group
1734                }
1735                JobResourceGroupTarget::Keep => {
1736                    self.metadata_manager
1737                        .catalog_controller
1738                        .get_existing_job_resource_group(*job_id as _)
1739                        .await?
1740                }
1741            };
1742
1743            let filtered_worker_ids =
1744                filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1745
1746            if filtered_worker_ids.is_empty() {
1747                bail!("Cannot resize streaming_job {job_id} to empty worker set")
1748            }
1749
1750            job_parallelism_updates.insert(
1751                *job_id,
1752                JobUpdate {
1753                    filtered_worker_ids,
1754                    parallelism,
1755                },
1756            );
1757        }
1758
1759        // index for no shuffle relation
1760        let mut no_shuffle_source_fragment_ids = HashSet::new();
1761        let mut no_shuffle_target_fragment_ids = HashSet::new();
1762
1763        // index for fragment_id -> (distribution_type, vnode_count)
1764        let mut fragment_distribution_map = HashMap::new();
1765        // index for actor -> worker id
1766        let mut actor_location = HashMap::new();
1767        // index for table_id -> [fragment_id]
1768        let mut table_fragment_id_map = HashMap::new();
1769        // index for fragment_id -> [actor_id]
1770        let mut fragment_actor_id_map = HashMap::new();
1771
1772        async fn build_index(
1773            no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1774            no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1775            fragment_distribution_map: &mut HashMap<
1776                FragmentId,
1777                (FragmentDistributionType, VnodeCount, bool),
1778            >,
1779            actor_location: &mut HashMap<ActorId, WorkerId>,
1780            table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1781            fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1782            mgr: &MetadataManager,
1783            table_ids: Vec<ObjectId>,
1784            generate_plan_only_for_cdc_table_backfill: bool,
1785        ) -> Result<(), MetaError> {
1786            let RescheduleWorkingSet {
1787                fragments,
1788                actors,
1789                actor_dispatchers: _actor_dispatchers,
1790                fragment_downstreams,
1791                fragment_upstreams: _fragment_upstreams,
1792                related_jobs: _related_jobs,
1793                job_resource_groups: _job_resource_groups,
1794            } = mgr
1795                .catalog_controller
1796                .resolve_working_set_for_reschedule_tables(table_ids)
1797                .await?;
1798
1799            for (fragment_id, downstreams) in fragment_downstreams {
1800                for (downstream_fragment_id, dispatcher_type) in downstreams {
1801                    if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1802                        no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1803                        no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1804                    }
1805                }
1806            }
1807
1808            for (fragment_id, fragment) in fragments {
1809                let is_cdc_backfill_v2_fragment =
1810                    FragmentTypeMask::from(fragment.fragment_type_mask)
1811                        .contains(FragmentTypeFlag::StreamCdcScan);
1812                if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1813                    continue;
1814                }
1815                fragment_distribution_map.insert(
1816                    fragment_id as FragmentId,
1817                    (
1818                        FragmentDistributionType::from(fragment.distribution_type),
1819                        fragment.vnode_count as _,
1820                        is_cdc_backfill_v2_fragment,
1821                    ),
1822                );
1823
1824                table_fragment_id_map
1825                    .entry(fragment.job_id as u32)
1826                    .or_default()
1827                    .insert(fragment_id as FragmentId);
1828            }
1829
1830            for (actor_id, actor) in actors {
1831                actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1832                fragment_actor_id_map
1833                    .entry(actor.fragment_id as FragmentId)
1834                    .or_default()
1835                    .insert(actor_id as ActorId);
1836            }
1837
1838            Ok(())
1839        }
1840
1841        let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1842
1843        build_index(
1844            &mut no_shuffle_source_fragment_ids,
1845            &mut no_shuffle_target_fragment_ids,
1846            &mut fragment_distribution_map,
1847            &mut actor_location,
1848            &mut table_fragment_id_map,
1849            &mut fragment_actor_id_map,
1850            &self.metadata_manager,
1851            table_ids,
1852            generate_plan_for_cdc_table_backfill,
1853        )
1854        .await?;
1855        tracing::debug!(
1856            ?job_reschedule_post_updates,
1857            ?job_parallelism_updates,
1858            ?no_shuffle_source_fragment_ids,
1859            ?no_shuffle_target_fragment_ids,
1860            ?fragment_distribution_map,
1861            ?actor_location,
1862            ?table_fragment_id_map,
1863            ?fragment_actor_id_map,
1864            "generate_table_resize_plan, after build_index"
1865        );
1866
1867        let adaptive_parallelism_strategy = self
1868            .env
1869            .system_params_reader()
1870            .await
1871            .adaptive_parallelism_strategy();
1872
1873        let mut target_plan = HashMap::new();
1874
1875        for (
1876            table_id,
1877            JobUpdate {
1878                filtered_worker_ids,
1879                parallelism,
1880            },
1881        ) in job_parallelism_updates
1882        {
1883            let assigner = AssignerBuilder::new(table_id).build();
1884
1885            let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1886
1887            let available_worker_slots = workers
1888                .iter()
1889                .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1890                .map(|(_, worker)| {
1891                    (
1892                        worker.id as WorkerId,
1893                        NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1894                    )
1895                })
1896                .collect::<BTreeMap<_, _>>();
1897
1898            for fragment_id in fragment_map {
1899                // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream.
1900                if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1901                    continue;
1902                }
1903
1904                let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1905
1906                for actor_id in &fragment_actor_id_map[&fragment_id] {
1907                    let worker_id = actor_location[actor_id];
1908                    *fragment_slots.entry(worker_id).or_default() += 1;
1909                }
1910
1911                let available_slot_count: usize = available_worker_slots
1912                    .values()
1913                    .cloned()
1914                    .map(NonZeroUsize::get)
1915                    .sum();
1916
1917                if available_slot_count == 0 {
1918                    bail!(
1919                        "No schedulable slots available for fragment {}",
1920                        fragment_id
1921                    );
1922                }
1923
1924                let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1925                    fragment_distribution_map[&fragment_id];
1926                let max_parallelism = vnode_count;
1927                let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1928                    assert!(is_cdc_backfill_v2_fragment);
1929                    let TableParallelism::Fixed(new_parallelism) = parallelism else {
1930                        return Err(anyhow::anyhow!(
1931                            "invalid new parallelism {:?}, expect fixed parallelism",
1932                            parallelism
1933                        )
1934                        .into());
1935                    };
1936                    if new_parallelism > max_parallelism || new_parallelism == 0 {
1937                        return Err(anyhow::anyhow!(
1938                            "invalid new parallelism {}, max parallelism {}",
1939                            new_parallelism,
1940                            max_parallelism
1941                        )
1942                        .into());
1943                    }
1944                    TableParallelism::Fixed(new_parallelism)
1945                } else if is_cdc_backfill_v2_fragment {
1946                    TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1947                } else {
1948                    parallelism
1949                };
1950                match dist {
1951                    FragmentDistributionType::Unspecified => unreachable!(),
1952                    FragmentDistributionType::Single => {
1953                        let (single_worker_id, should_be_one) = fragment_slots
1954                            .iter()
1955                            .exactly_one()
1956                            .expect("single fragment should have only one worker slot");
1957
1958                        assert_eq!(*should_be_one, 1);
1959
1960                        let assignment =
1961                            assigner.count_actors_per_worker(&available_worker_slots, 1);
1962
1963                        let (chosen_target_worker_id, should_be_one) =
1964                            assignment.iter().exactly_one().ok().with_context(|| {
1965                                format!(
1966                                    "Cannot find a single target worker for fragment {fragment_id}"
1967                                )
1968                            })?;
1969
1970                        assert_eq!(*should_be_one, 1);
1971
1972                        if *chosen_target_worker_id == *single_worker_id {
1973                            tracing::debug!(
1974                                "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1975                            );
1976                            continue;
1977                        }
1978
1979                        target_plan.insert(
1980                            fragment_id,
1981                            WorkerReschedule {
1982                                worker_actor_diff: BTreeMap::from_iter(vec![
1983                                    (*chosen_target_worker_id, 1),
1984                                    (*single_worker_id, -1),
1985                                ]),
1986                            },
1987                        );
1988                    }
1989                    FragmentDistributionType::Hash => match fragment_parallelism_strategy {
1990                        TableParallelism::Adaptive => {
1991                            let target_slot_count = adaptive_parallelism_strategy
1992                                .compute_target_parallelism(available_slot_count);
1993
1994                            if target_slot_count > max_parallelism {
1995                                tracing::warn!(
1996                                    "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1997                                );
1998
1999                                let target_worker_slots = assigner.count_actors_per_worker(
2000                                    &available_worker_slots,
2001                                    max_parallelism,
2002                                );
2003
2004                                target_plan.insert(
2005                                    fragment_id,
2006                                    Self::diff_worker_slot_changes(
2007                                        &fragment_slots,
2008                                        &target_worker_slots,
2009                                    ),
2010                                );
2011                            } else if available_slot_count != target_slot_count {
2012                                tracing::info!(
2013                                    "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2014                                );
2015
2016                                let target_worker_slots = assigner.count_actors_per_worker(
2017                                    &available_worker_slots,
2018                                    target_slot_count,
2019                                );
2020
2021                                target_plan.insert(
2022                                    fragment_id,
2023                                    Self::diff_worker_slot_changes(
2024                                        &fragment_slots,
2025                                        &target_worker_slots,
2026                                    ),
2027                                );
2028                            } else {
2029                                let available_worker_slots = available_worker_slots
2030                                    .iter()
2031                                    .map(|(worker_id, v)| (*worker_id, v.get()))
2032                                    .collect();
2033
2034                                target_plan.insert(
2035                                    fragment_id,
2036                                    Self::diff_worker_slot_changes(
2037                                        &fragment_slots,
2038                                        &available_worker_slots,
2039                                    ),
2040                                );
2041                            }
2042                        }
2043                        TableParallelism::Fixed(mut n) => {
2044                            if n > max_parallelism {
2045                                tracing::warn!(
2046                                    "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2047                                );
2048                                n = max_parallelism
2049                            }
2050
2051                            let target_worker_slots =
2052                                assigner.count_actors_per_worker(&available_worker_slots, n);
2053
2054                            target_plan.insert(
2055                                fragment_id,
2056                                Self::diff_worker_slot_changes(
2057                                    &fragment_slots,
2058                                    &target_worker_slots,
2059                                ),
2060                            );
2061                        }
2062                        TableParallelism::Custom => {
2063                            // skipping for custom
2064                        }
2065                    },
2066                }
2067            }
2068        }
2069
2070        target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2071        tracing::debug!(
2072            ?target_plan,
2073            "generate_table_resize_plan finished target_plan"
2074        );
2075        if generate_plan_for_cdc_table_backfill {
2076            job_reschedule_post_updates.resource_group_updates = HashMap::default();
2077            job_reschedule_post_updates.parallelism_updates = HashMap::default();
2078        }
2079        Ok(JobReschedulePlan {
2080            reschedules: target_plan,
2081            post_updates: job_reschedule_post_updates,
2082        })
2083    }
2084
2085    fn diff_worker_slot_changes(
2086        fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2087        target_worker_slots: &BTreeMap<WorkerId, usize>,
2088    ) -> WorkerReschedule {
2089        let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2090        let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2091
2092        for (&worker_id, &target_slots) in target_worker_slots {
2093            let &current_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2094
2095            if target_slots > current_slots {
2096                increased_actor_count.insert(worker_id, target_slots - current_slots);
2097            }
2098        }
2099
2100        for (&worker_id, &current_slots) in fragment_worker_slots {
2101            let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2102
2103            if current_slots > target_slots {
2104                decreased_actor_count.insert(worker_id, current_slots - target_slots);
2105            }
2106        }
2107
2108        let worker_ids: HashSet<_> = increased_actor_count
2109            .keys()
2110            .chain(decreased_actor_count.keys())
2111            .cloned()
2112            .collect();
2113
2114        let mut worker_actor_diff = BTreeMap::new();
2115
2116        for worker_id in worker_ids {
2117            let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2118            let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2119            let change = increased - decreased;
2120
2121            assert_ne!(change, 0);
2122
2123            worker_actor_diff.insert(worker_id, change);
2124        }
2125
2126        WorkerReschedule { worker_actor_diff }
2127    }
2128
2129    fn build_no_shuffle_relation_index(
2130        actor_map: &HashMap<ActorId, CustomActorInfo>,
2131        no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2132        no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2133    ) {
2134        let mut fragment_cache = HashSet::new();
2135        for actor in actor_map.values() {
2136            if fragment_cache.contains(&actor.fragment_id) {
2137                continue;
2138            }
2139
2140            for dispatcher in &actor.dispatcher {
2141                for downstream_actor_id in &dispatcher.downstream_actor_id {
2142                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2143                        // Checking for no shuffle dispatchers
2144                        if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2145                            no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2146                            no_shuffle_target_fragment_ids
2147                                .insert(downstream_actor.fragment_id as FragmentId);
2148                        }
2149                    }
2150                }
2151            }
2152
2153            fragment_cache.insert(actor.fragment_id);
2154        }
2155    }
2156
2157    fn build_fragment_dispatcher_index(
2158        actor_map: &HashMap<ActorId, CustomActorInfo>,
2159        fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2160    ) {
2161        for actor in actor_map.values() {
2162            for dispatcher in &actor.dispatcher {
2163                for downstream_actor_id in &dispatcher.downstream_actor_id {
2164                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2165                        fragment_dispatcher_map
2166                            .entry(actor.fragment_id as FragmentId)
2167                            .or_default()
2168                            .insert(
2169                                downstream_actor.fragment_id as FragmentId,
2170                                dispatcher.r#type().into(),
2171                            );
2172                    }
2173                }
2174            }
2175        }
2176    }
2177
2178    pub fn resolve_no_shuffle_upstream_tables(
2179        fragment_ids: HashSet<FragmentId>,
2180        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2181        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2182        fragment_to_table: &HashMap<FragmentId, TableId>,
2183        fragment_upstreams: &HashMap<
2184            risingwave_meta_model::FragmentId,
2185            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2186        >,
2187        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2188    ) -> MetaResult<()> {
2189        let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2190
2191        let mut fragment_ids = fragment_ids;
2192
2193        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2194        // for every no_shuffle relation.
2195        while let Some(fragment_id) = queue.pop_front() {
2196            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2197                continue;
2198            }
2199
2200            // for upstream
2201            for upstream_fragment_id in fragment_upstreams
2202                .get(&(fragment_id as _))
2203                .map(|upstreams| upstreams.keys())
2204                .into_iter()
2205                .flatten()
2206            {
2207                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2208                let upstream_fragment_id = &upstream_fragment_id;
2209                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2210                    continue;
2211                }
2212
2213                let table_id = &fragment_to_table[&fragment_id];
2214                let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2215
2216                // Only custom parallelism will be propagated to the no shuffle upstream.
2217                if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2218                    if let Some(upstream_table_parallelism) =
2219                        table_parallelisms.get(upstream_table_id)
2220                    {
2221                        if upstream_table_parallelism != &TableParallelism::Custom {
2222                            bail!(
2223                                "Cannot change upstream table {} from {:?} to {:?}",
2224                                upstream_table_id,
2225                                upstream_table_parallelism,
2226                                TableParallelism::Custom
2227                            )
2228                        }
2229                    } else {
2230                        table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2231                    }
2232                }
2233
2234                fragment_ids.insert(*upstream_fragment_id);
2235                queue.push_back(*upstream_fragment_id);
2236            }
2237        }
2238
2239        let downstream_fragment_ids = fragment_ids
2240            .iter()
2241            .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2242
2243        let downstream_table_ids = downstream_fragment_ids
2244            .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2245            .collect::<HashSet<_>>();
2246
2247        table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2248
2249        Ok(())
2250    }
2251
2252    pub fn resolve_no_shuffle_upstream_fragments<T>(
2253        reschedule: &mut HashMap<FragmentId, T>,
2254        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2255        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2256        fragment_upstreams: &HashMap<
2257            risingwave_meta_model::FragmentId,
2258            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2259        >,
2260    ) -> MetaResult<()>
2261    where
2262        T: Clone + Eq,
2263    {
2264        let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2265
2266        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2267        // for every no_shuffle relation.
2268        while let Some(fragment_id) = queue.pop_front() {
2269            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2270                continue;
2271            }
2272
2273            // for upstream
2274            for upstream_fragment_id in fragment_upstreams
2275                .get(&(fragment_id as _))
2276                .map(|upstreams| upstreams.keys())
2277                .into_iter()
2278                .flatten()
2279            {
2280                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2281                let upstream_fragment_id = &upstream_fragment_id;
2282                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2283                    continue;
2284                }
2285
2286                let reschedule_plan = &reschedule[&fragment_id];
2287
2288                if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2289                    if upstream_reschedule_plan != reschedule_plan {
2290                        bail!(
2291                            "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2292                            fragment_id,
2293                            upstream_fragment_id
2294                        );
2295                    }
2296
2297                    continue;
2298                }
2299
2300                reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2301
2302                queue.push_back(*upstream_fragment_id);
2303            }
2304        }
2305
2306        reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2307
2308        Ok(())
2309    }
2310
2311    pub async fn resolve_related_no_shuffle_jobs(
2312        &self,
2313        jobs: &[TableId],
2314    ) -> MetaResult<HashSet<TableId>> {
2315        let RescheduleWorkingSet { related_jobs, .. } = self
2316            .metadata_manager
2317            .catalog_controller
2318            .resolve_working_set_for_reschedule_tables(
2319                jobs.iter().map(|id| id.table_id as _).collect(),
2320            )
2321            .await?;
2322
2323        Ok(related_jobs
2324            .keys()
2325            .map(|id| TableId::new(*id as _))
2326            .collect())
2327    }
2328}
2329
2330#[derive(Debug, Clone)]
2331pub enum JobParallelismTarget {
2332    Update(TableParallelism),
2333    Refresh,
2334}
2335
2336#[derive(Debug, Clone)]
2337pub enum JobResourceGroupTarget {
2338    Update(Option<String>),
2339    Keep,
2340}
2341
2342#[derive(Debug, Clone)]
2343pub struct JobRescheduleTarget {
2344    pub parallelism: JobParallelismTarget,
2345    pub resource_group: JobResourceGroupTarget,
2346}
2347
2348#[derive(Debug)]
2349pub struct JobReschedulePolicy {
2350    pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2351}
2352
2353// final updates for `post_collect`
2354#[derive(Debug, Clone)]
2355pub struct JobReschedulePostUpdates {
2356    pub parallelism_updates: HashMap<TableId, TableParallelism>,
2357    pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2358}
2359
2360#[derive(Debug)]
2361pub struct JobReschedulePlan {
2362    pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2363    pub post_updates: JobReschedulePostUpdates,
2364}
2365
2366impl GlobalStreamManager {
2367    #[await_tree::instrument("acquire_reschedule_read_guard")]
2368    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2369        self.scale_controller.reschedule_lock.read().await
2370    }
2371
2372    #[await_tree::instrument("acquire_reschedule_write_guard")]
2373    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2374        self.scale_controller.reschedule_lock.write().await
2375    }
2376
2377    /// The entrypoint of rescheduling actors.
2378    ///
2379    /// Used by:
2380    /// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService` (`risectl meta reschedule`)
2381    /// - High-level parallelism control API
2382    ///     * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM`
2383    ///     * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed
2384    pub async fn reschedule_actors(
2385        &self,
2386        database_id: DatabaseId,
2387        plan: JobReschedulePlan,
2388        options: RescheduleOptions,
2389    ) -> MetaResult<()> {
2390        let JobReschedulePlan {
2391            reschedules,
2392            mut post_updates,
2393        } = plan;
2394
2395        let reschedule_fragment = self
2396            .scale_controller
2397            .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2398            .await?;
2399
2400        tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2401
2402        let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2403            .iter()
2404            .flat_map(|(_, reschedule)| {
2405                reschedule
2406                    .upstream_fragment_dispatcher_ids
2407                    .iter()
2408                    .map(|(fragment_id, _)| *fragment_id)
2409                    .chain(reschedule.downstream_fragment_ids.iter().cloned())
2410            })
2411            .collect();
2412
2413        let fragment_actors = up_down_stream_fragment
2414            .iter()
2415            .map(|fragment_id| {
2416                self.metadata_manager
2417                    .get_running_actors_of_fragment(*fragment_id)
2418                    .map(|actor_ids| (*fragment_id, actor_ids))
2419            })
2420            .collect::<Result<_, _>>()?;
2421
2422        let command = Command::RescheduleFragment {
2423            reschedules: reschedule_fragment,
2424            fragment_actors,
2425            post_updates,
2426        };
2427
2428        let _guard = self.source_manager.pause_tick().await;
2429
2430        self.barrier_scheduler
2431            .run_command(database_id, command)
2432            .await?;
2433
2434        tracing::info!("reschedule done");
2435
2436        Ok(())
2437    }
2438
2439    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
2440    /// examines if there are any jobs can be scaled, and scales them if found.
2441    ///
2442    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
2443    ///
2444    /// Returns
2445    /// - `Ok(false)` if no jobs can be scaled;
2446    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
2447    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2448        tracing::info!("trigger parallelism control");
2449
2450        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2451
2452        let background_streaming_jobs = self
2453            .metadata_manager
2454            .list_background_creating_jobs()
2455            .await?;
2456
2457        let skipped_jobs = if !background_streaming_jobs.is_empty() {
2458            let jobs = self
2459                .scale_controller
2460                .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2461                .await?;
2462
2463            tracing::info!(
2464                "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2465                background_streaming_jobs,
2466                jobs
2467            );
2468
2469            jobs
2470        } else {
2471            HashSet::new()
2472        };
2473
2474        let job_ids: HashSet<_> = {
2475            let streaming_parallelisms = self
2476                .metadata_manager
2477                .catalog_controller
2478                .get_all_streaming_parallelisms()
2479                .await?;
2480
2481            streaming_parallelisms
2482                .into_iter()
2483                .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2484                .map(|(table_id, _)| table_id)
2485                .collect()
2486        };
2487
2488        let workers = self
2489            .metadata_manager
2490            .cluster_controller
2491            .list_active_streaming_workers()
2492            .await?;
2493
2494        let schedulable_worker_ids: BTreeSet<_> = workers
2495            .iter()
2496            .filter(|worker| {
2497                !worker
2498                    .property
2499                    .as_ref()
2500                    .map(|p| p.is_unschedulable)
2501                    .unwrap_or(false)
2502            })
2503            .map(|worker| worker.id as WorkerId)
2504            .collect();
2505
2506        if job_ids.is_empty() {
2507            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2508            return Ok(false);
2509        }
2510
2511        let batch_size = match self.env.opts.parallelism_control_batch_size {
2512            0 => job_ids.len(),
2513            n => n,
2514        };
2515
2516        tracing::info!(
2517            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2518            job_ids.len(),
2519            batch_size,
2520            schedulable_worker_ids
2521        );
2522
2523        let batches: Vec<_> = job_ids
2524            .into_iter()
2525            .chunks(batch_size)
2526            .into_iter()
2527            .map(|chunk| chunk.collect_vec())
2528            .collect();
2529
2530        let mut reschedules = None;
2531
2532        for batch in batches {
2533            let targets: HashMap<_, _> = batch
2534                .into_iter()
2535                .map(|job_id| {
2536                    (
2537                        job_id as u32,
2538                        JobRescheduleTarget {
2539                            parallelism: JobParallelismTarget::Refresh,
2540                            resource_group: JobResourceGroupTarget::Keep,
2541                        },
2542                    )
2543                })
2544                .collect();
2545
2546            let plan = self
2547                .scale_controller
2548                .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2549                .await?;
2550
2551            if !plan.reschedules.is_empty() {
2552                tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2553                reschedules = Some(plan);
2554                break;
2555            }
2556        }
2557
2558        let Some(plan) = reschedules else {
2559            tracing::info!("no reschedule plan generated");
2560            return Ok(false);
2561        };
2562
2563        // todo
2564        for (database_id, reschedules) in self
2565            .metadata_manager
2566            .split_fragment_map_by_database(plan.reschedules)
2567            .await?
2568        {
2569            self.reschedule_actors(
2570                database_id,
2571                JobReschedulePlan {
2572                    reschedules,
2573                    post_updates: plan.post_updates.clone(),
2574                },
2575                RescheduleOptions {
2576                    resolve_no_shuffle_upstream: false,
2577                    skip_create_new_actors: false,
2578                },
2579            )
2580            .await?;
2581        }
2582
2583        Ok(true)
2584    }
2585
2586    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
2587    async fn run(&self, mut shutdown_rx: Receiver<()>) {
2588        tracing::info!("starting automatic parallelism control monitor");
2589
2590        let check_period =
2591            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2592
2593        let mut ticker = tokio::time::interval_at(
2594            Instant::now()
2595                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2596            check_period,
2597        );
2598        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2599
2600        // waiting for the first tick
2601        ticker.tick().await;
2602
2603        let (local_notification_tx, mut local_notification_rx) =
2604            tokio::sync::mpsc::unbounded_channel();
2605
2606        self.env
2607            .notification_manager()
2608            .insert_local_sender(local_notification_tx);
2609
2610        let worker_nodes = self
2611            .metadata_manager
2612            .list_active_streaming_compute_nodes()
2613            .await
2614            .expect("list active streaming compute nodes");
2615
2616        let mut worker_cache: BTreeMap<_, _> = worker_nodes
2617            .into_iter()
2618            .map(|worker| (worker.id, worker))
2619            .collect();
2620
2621        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2622
2623        let mut should_trigger = false;
2624
2625        loop {
2626            tokio::select! {
2627                biased;
2628
2629                _ = &mut shutdown_rx => {
2630                    tracing::info!("Stream manager is stopped");
2631                    break;
2632                }
2633
2634                _ = ticker.tick(), if should_trigger => {
2635                    let include_workers = worker_cache.keys().copied().collect_vec();
2636
2637                    if include_workers.is_empty() {
2638                        tracing::debug!("no available worker nodes");
2639                        should_trigger = false;
2640                        continue;
2641                    }
2642
2643                    match self.trigger_parallelism_control().await {
2644                        Ok(cont) => {
2645                            should_trigger = cont;
2646                        }
2647                        Err(e) => {
2648                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2649                            ticker.reset();
2650                        }
2651                    }
2652                }
2653
2654                notification = local_notification_rx.recv() => {
2655                    let notification = notification.expect("local notification channel closed in loop of stream manager");
2656
2657                    // Only maintain the cache for streaming compute nodes.
2658                    let worker_is_streaming_compute = |worker: &WorkerNode| {
2659                        worker.get_type() == Ok(WorkerType::ComputeNode)
2660                            && worker.property.as_ref().unwrap().is_streaming
2661                    };
2662
2663                    match notification {
2664                        LocalNotification::SystemParamsChange(reader) => {
2665                            let new_strategy = reader.adaptive_parallelism_strategy();
2666                            if new_strategy != previous_adaptive_parallelism_strategy {
2667                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2668                                should_trigger = true;
2669                                previous_adaptive_parallelism_strategy = new_strategy;
2670                            }
2671                        }
2672                        LocalNotification::WorkerNodeActivated(worker) => {
2673                            if !worker_is_streaming_compute(&worker) {
2674                                continue;
2675                            }
2676
2677                            tracing::info!(worker = worker.id, "worker activated notification received");
2678
2679                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
2680
2681                            match prev_worker {
2682                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
2683                                    tracing::info!(worker = worker.id, "worker parallelism changed");
2684                                    should_trigger = true;
2685                                }
2686                                Some(prev_worker) if  prev_worker.resource_group() != worker.resource_group()  => {
2687                                    tracing::info!(worker = worker.id, "worker label changed");
2688                                    should_trigger = true;
2689                                }
2690                                None => {
2691                                    tracing::info!(worker = worker.id, "new worker joined");
2692                                    should_trigger = true;
2693                                }
2694                                _ => {}
2695                            }
2696                        }
2697
2698                        // Since our logic for handling passive scale-in is within the barrier manager,
2699                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
2700                        LocalNotification::WorkerNodeDeleted(worker) => {
2701                            if !worker_is_streaming_compute(&worker) {
2702                                continue;
2703                            }
2704
2705                            match worker_cache.remove(&worker.id) {
2706                                Some(prev_worker) => {
2707                                    tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2708                                }
2709                                None => {
2710                                    tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2711                                }
2712                            }
2713                        }
2714
2715                        _ => {}
2716                    }
2717                }
2718            }
2719        }
2720    }
2721
2722    pub fn start_auto_parallelism_monitor(
2723        self: Arc<Self>,
2724    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2725        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2726        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2727        let join_handle = tokio::spawn(async move {
2728            self.run(shutdown_rx).await;
2729        });
2730
2731        (join_handle, shutdown_tx)
2732    }
2733}