risingwave_meta/stream/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36    FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50    ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53    ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64    pub job_id: u32,
65    pub fragment_id: u32,
66    pub fragment_type_mask: FragmentTypeMask,
67    pub distribution_type: PbFragmentDistributionType,
68    pub state_table_ids: Vec<u32>,
69    pub node: StreamNode,
70    pub actor_template: StreamActorWithDispatchers,
71    pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76    pub actor_id: u32,
77    pub fragment_id: u32,
78    pub dispatcher: Vec<Dispatcher>,
79    /// `None` if singleton.
80    pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use futures::future::try_join_all;
85use risingwave_common::system_param::AdaptiveParallelismStrategy;
86use risingwave_common::system_param::reader::SystemParamsRead;
87use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
88use risingwave_meta_model::DispatcherType;
89use risingwave_pb::stream_plan::stream_node::NodeBody;
90
91use super::SourceChange;
92use crate::controller::id::IdCategory;
93use crate::controller::utils::filter_workers_by_resource_group;
94use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
95
96// The debug implementation is arbitrary. Just used in debug logs.
97#[derive(Educe)]
98#[educe(Debug)]
99pub struct RescheduleContext {
100    /// Meta information for all Actors
101    #[educe(Debug(ignore))]
102    actor_map: HashMap<ActorId, CustomActorInfo>,
103    /// Status of all Actors, used to find the location of the `Actor`
104    actor_status: BTreeMap<ActorId, WorkerId>,
105    /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor`
106    #[educe(Debug(ignore))]
107    fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
108    /// Fragments with `StreamSource`
109    stream_source_fragment_ids: HashSet<FragmentId>,
110    /// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment
111    stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
112    /// Target fragments in `NoShuffle` relation
113    no_shuffle_target_fragment_ids: HashSet<FragmentId>,
114    /// Source fragments in `NoShuffle` relation
115    no_shuffle_source_fragment_ids: HashSet<FragmentId>,
116    // index for dispatcher type from upstream fragment to downstream fragment
117    fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
118    fragment_upstreams: HashMap<
119        risingwave_meta_model::FragmentId,
120        HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
121    >,
122}
123
124impl RescheduleContext {
125    fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
126        self.actor_status
127            .get(actor_id)
128            .cloned()
129            .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
130    }
131}
132
133/// This function provides an simple balancing method
134/// The specific process is as follows
135///
136/// 1. Calculate the number of target actors, and calculate the average value and the remainder, and
137///    use the average value as expected.
138///
139/// 2. Filter out the actor to be removed and the actor to be retained, and sort them from largest
140///    to smallest (according to the number of virtual nodes held).
141///
142/// 3. Calculate their balance, 1) For the actors to be removed, the number of virtual nodes per
143///    actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the
144///    balance. 3) For newly created actors, -expected is the balance (always negative).
145///
146/// 4. Allocate the remainder, high priority to newly created nodes.
147///
148/// 5. After that, merge removed, retained and created into a queue, with the head of the queue
149///    being the source, and move the virtual nodes to the destination at the end of the queue.
150///
151/// This can handle scale in, scale out, migration, and simultaneous scaling with as much affinity
152/// as possible.
153///
154/// Note that this function can only rebalance actors whose `vnode_bitmap` is not `None`, in other
155/// words, for `Fragment` of `FragmentDistributionType::Single`, using this function will cause
156/// assert to fail and should be skipped from the upper level.
157///
158/// The return value is the bitmap distribution after scaling, which covers all virtual node indexes
159pub fn rebalance_actor_vnode(
160    actors: &[CustomActorInfo],
161    actors_to_remove: &BTreeSet<ActorId>,
162    actors_to_create: &BTreeSet<ActorId>,
163) -> HashMap<ActorId, Bitmap> {
164    let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
165
166    assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
167    assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
168
169    assert!(actors.len() >= actors_to_remove.len());
170
171    let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
172    assert!(target_actor_count > 0);
173
174    // `vnode_bitmap` must be set on distributed fragments.
175    let vnode_count = actors[0]
176        .vnode_bitmap
177        .as_ref()
178        .expect("vnode bitmap unset")
179        .len();
180
181    // represents the balance of each actor, used to sort later
182    #[derive(Debug)]
183    struct Balance {
184        actor_id: ActorId,
185        balance: i32,
186        builder: BitmapBuilder,
187    }
188    let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
189
190    tracing::debug!(
191        "expected {}, remain {}, prev actors {}, target actors {}",
192        expected,
193        remain,
194        actors.len(),
195        target_actor_count,
196    );
197
198    let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
199        .iter()
200        .map(|actor| {
201            (
202                actor.actor_id as ActorId,
203                actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
204            )
205        })
206        .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
207
208    let order_by_bitmap_desc =
209        |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
210            bitmap_a
211                .count_ones()
212                .cmp(&bitmap_b.count_ones())
213                .reverse()
214                .then(id_a.cmp(id_b))
215        };
216
217    let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
218        let mut builder = BitmapBuilder::default();
219        builder.append_bitmap(bitmap);
220        builder
221    };
222
223    let (prev_expected, _) = vnode_count.div_rem(&actors.len());
224
225    let prev_remain = removed
226        .iter()
227        .map(|(_, bitmap)| {
228            assert!(bitmap.count_ones() >= prev_expected);
229            bitmap.count_ones() - prev_expected
230        })
231        .sum::<usize>();
232
233    removed.sort_by(order_by_bitmap_desc);
234    rest.sort_by(order_by_bitmap_desc);
235
236    let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
237        actor_id,
238        balance: bitmap.count_ones() as i32,
239        builder: builder_from_bitmap(&bitmap),
240    });
241
242    let mut rest_balances = rest
243        .into_iter()
244        .map(|(actor_id, bitmap)| Balance {
245            actor_id,
246            balance: bitmap.count_ones() as i32 - expected as i32,
247            builder: builder_from_bitmap(&bitmap),
248        })
249        .collect_vec();
250
251    let mut created_balances = actors_to_create
252        .iter()
253        .map(|actor_id| Balance {
254            actor_id: *actor_id,
255            balance: -(expected as i32),
256            builder: BitmapBuilder::zeroed(vnode_count),
257        })
258        .collect_vec();
259
260    for balance in created_balances
261        .iter_mut()
262        .rev()
263        .take(prev_remain)
264        .chain(rest_balances.iter_mut())
265    {
266        if remain > 0 {
267            balance.balance -= 1;
268            remain -= 1;
269        }
270    }
271
272    // consume the rest `remain`
273    for balance in &mut created_balances {
274        if remain > 0 {
275            balance.balance -= 1;
276            remain -= 1;
277        }
278    }
279
280    assert_eq!(remain, 0);
281
282    let mut v: VecDeque<_> = removed_balances
283        .chain(rest_balances)
284        .chain(created_balances)
285        .collect();
286
287    // We will return the full bitmap here after rebalancing,
288    // if we want to return only the changed actors, filter balance = 0 here
289    let mut result = HashMap::with_capacity(target_actor_count);
290
291    for balance in &v {
292        tracing::debug!(
293            "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
294            balance.actor_id,
295            balance.balance,
296            actors_to_remove.contains(&balance.actor_id),
297            actors_to_create.contains(&balance.actor_id)
298        );
299    }
300
301    while !v.is_empty() {
302        if v.len() == 1 {
303            let single = v.pop_front().unwrap();
304            assert_eq!(single.balance, 0);
305            if !actors_to_remove.contains(&single.actor_id) {
306                result.insert(single.actor_id, single.builder.finish());
307            }
308
309            continue;
310        }
311
312        let mut src = v.pop_front().unwrap();
313        let mut dst = v.pop_back().unwrap();
314
315        let n = min(abs(src.balance), abs(dst.balance));
316
317        let mut moved = 0;
318        for idx in (0..vnode_count).rev() {
319            if moved >= n {
320                break;
321            }
322
323            if src.builder.is_set(idx) {
324                src.builder.set(idx, false);
325                assert!(!dst.builder.is_set(idx));
326                dst.builder.set(idx, true);
327                moved += 1;
328            }
329        }
330
331        src.balance -= n;
332        dst.balance += n;
333
334        if src.balance != 0 {
335            v.push_front(src);
336        } else if !actors_to_remove.contains(&src.actor_id) {
337            result.insert(src.actor_id, src.builder.finish());
338        }
339
340        if dst.balance != 0 {
341            v.push_back(dst);
342        } else {
343            result.insert(dst.actor_id, dst.builder.finish());
344        }
345    }
346
347    result
348}
349
350#[derive(Debug, Clone, Copy)]
351pub struct RescheduleOptions {
352    /// Whether to resolve the upstream of `NoShuffle` when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree.
353    pub resolve_no_shuffle_upstream: bool,
354
355    /// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created.
356    pub skip_create_new_actors: bool,
357}
358
359pub type ScaleControllerRef = Arc<ScaleController>;
360
361pub struct ScaleController {
362    pub metadata_manager: MetadataManager,
363
364    pub source_manager: SourceManagerRef,
365
366    pub env: MetaSrvEnv,
367
368    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
369    /// e.g., a MV cannot be rescheduled during foreground backfill.
370    pub reschedule_lock: RwLock<()>,
371}
372
373impl ScaleController {
374    pub fn new(
375        metadata_manager: &MetadataManager,
376        source_manager: SourceManagerRef,
377        env: MetaSrvEnv,
378    ) -> Self {
379        Self {
380            metadata_manager: metadata_manager.clone(),
381            source_manager,
382            env,
383            reschedule_lock: RwLock::new(()),
384        }
385    }
386
387    pub async fn integrity_check(&self) -> MetaResult<()> {
388        self.metadata_manager
389            .catalog_controller
390            .integrity_check()
391            .await
392    }
393
394    /// Build the context for rescheduling and do some validation for the request.
395    async fn build_reschedule_context(
396        &self,
397        reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
398        options: RescheduleOptions,
399        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
400    ) -> MetaResult<RescheduleContext> {
401        let worker_nodes: HashMap<WorkerId, WorkerNode> = self
402            .metadata_manager
403            .list_active_streaming_compute_nodes()
404            .await?
405            .into_iter()
406            .map(|worker_node| (worker_node.id as _, worker_node))
407            .collect();
408
409        if worker_nodes.is_empty() {
410            bail!("no available compute node in the cluster");
411        }
412
413        // Check if we are trying to move a fragment to a node marked as unschedulable
414        let unschedulable_worker_ids: HashSet<_> = worker_nodes
415            .values()
416            .filter(|w| {
417                w.property
418                    .as_ref()
419                    .map(|property| property.is_unschedulable)
420                    .unwrap_or(false)
421            })
422            .map(|worker| worker.id as WorkerId)
423            .collect();
424
425        for (fragment_id, reschedule) in &*reschedule {
426            for (worker_id, change) in &reschedule.worker_actor_diff {
427                if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
428                    bail!(
429                        "unable to move fragment {} to unschedulable worker {}",
430                        fragment_id,
431                        worker_id
432                    );
433                }
434            }
435        }
436
437        // FIXME: the same as anther place calling `list_table_fragments` in scaling.
438        // Index for StreamActor
439        let mut actor_map = HashMap::new();
440        // Index for Fragment
441        let mut fragment_map = HashMap::new();
442        // Index for actor status, including actor's worker id
443        let mut actor_status = BTreeMap::new();
444        let mut fragment_state = HashMap::new();
445        let mut fragment_to_table = HashMap::new();
446
447        fn fulfill_index_by_fragment_ids(
448            actor_map: &mut HashMap<u32, CustomActorInfo>,
449            fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
450            actor_status: &mut BTreeMap<ActorId, WorkerId>,
451            fragment_state: &mut HashMap<FragmentId, State>,
452            fragment_to_table: &mut HashMap<FragmentId, TableId>,
453            fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
454            actors: HashMap<ActorId, actor::Model>,
455            mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
456            related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
457        ) {
458            let mut fragment_actors: HashMap<
459                risingwave_meta_model::FragmentId,
460                Vec<CustomActorInfo>,
461            > = HashMap::new();
462
463            let mut expr_contexts = HashMap::new();
464            for (
465                _,
466                actor::Model {
467                    actor_id,
468                    fragment_id,
469                    status: _,
470                    splits: _,
471                    worker_id,
472                    vnode_bitmap,
473                    expr_context,
474                    ..
475                },
476            ) in actors
477            {
478                let dispatchers = actor_dispatchers
479                    .remove(&(actor_id as _))
480                    .unwrap_or_default();
481
482                let actor_info = CustomActorInfo {
483                    actor_id: actor_id as _,
484                    fragment_id: fragment_id as _,
485                    dispatcher: dispatchers,
486                    vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
487                };
488
489                actor_map.insert(actor_id as _, actor_info.clone());
490
491                fragment_actors
492                    .entry(fragment_id as _)
493                    .or_default()
494                    .push(actor_info);
495
496                actor_status.insert(actor_id as _, worker_id as WorkerId);
497
498                expr_contexts.insert(actor_id as u32, expr_context);
499            }
500
501            for (
502                _,
503                fragment::Model {
504                    fragment_id,
505                    job_id,
506                    fragment_type_mask,
507                    distribution_type,
508                    stream_node,
509                    state_table_ids,
510                    ..
511                },
512            ) in fragments
513            {
514                let actors = fragment_actors
515                    .remove(&(fragment_id as _))
516                    .unwrap_or_default();
517
518                let CustomActorInfo {
519                    actor_id,
520                    fragment_id,
521                    dispatcher,
522                    vnode_bitmap,
523                } = actors.first().unwrap().clone();
524
525                let (related_job, job_definition) =
526                    related_jobs.get(&job_id).expect("job not found");
527
528                let fragment = CustomFragmentInfo {
529                    job_id: job_id as _,
530                    fragment_id: fragment_id as _,
531                    fragment_type_mask: fragment_type_mask.into(),
532                    distribution_type: distribution_type.into(),
533                    state_table_ids: state_table_ids.into_u32_array(),
534                    node: stream_node.to_protobuf(),
535                    actor_template: (
536                        StreamActor {
537                            actor_id,
538                            fragment_id: fragment_id as _,
539                            vnode_bitmap,
540                            mview_definition: job_definition.to_owned(),
541                            expr_context: expr_contexts
542                                .get(&actor_id)
543                                .cloned()
544                                .map(|expr_context| expr_context.to_protobuf()),
545                        },
546                        dispatcher,
547                    ),
548                    actors,
549                };
550
551                fragment_map.insert(fragment_id as _, fragment);
552
553                fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
554
555                fragment_state.insert(
556                    fragment_id,
557                    table_fragments::PbState::from(related_job.job_status),
558                );
559            }
560        }
561        let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
562        let working_set = self
563            .metadata_manager
564            .catalog_controller
565            .resolve_working_set_for_reschedule_fragments(fragment_ids)
566            .await?;
567
568        fulfill_index_by_fragment_ids(
569            &mut actor_map,
570            &mut fragment_map,
571            &mut actor_status,
572            &mut fragment_state,
573            &mut fragment_to_table,
574            working_set.fragments,
575            working_set.actors,
576            working_set.actor_dispatchers,
577            working_set.related_jobs,
578        );
579
580        // NoShuffle relation index
581        let mut no_shuffle_source_fragment_ids = HashSet::new();
582        let mut no_shuffle_target_fragment_ids = HashSet::new();
583
584        Self::build_no_shuffle_relation_index(
585            &actor_map,
586            &mut no_shuffle_source_fragment_ids,
587            &mut no_shuffle_target_fragment_ids,
588        );
589
590        if options.resolve_no_shuffle_upstream {
591            let original_reschedule_keys = reschedule.keys().cloned().collect();
592
593            Self::resolve_no_shuffle_upstream_fragments(
594                reschedule,
595                &no_shuffle_source_fragment_ids,
596                &no_shuffle_target_fragment_ids,
597                &working_set.fragment_upstreams,
598            )?;
599
600            if !table_parallelisms.is_empty() {
601                // We need to reiterate through the NO_SHUFFLE dependencies in order to ascertain which downstream table the custom modifications of the table have been propagated from.
602                Self::resolve_no_shuffle_upstream_tables(
603                    original_reschedule_keys,
604                    &no_shuffle_source_fragment_ids,
605                    &no_shuffle_target_fragment_ids,
606                    &fragment_to_table,
607                    &working_set.fragment_upstreams,
608                    table_parallelisms,
609                )?;
610            }
611        }
612
613        let mut fragment_dispatcher_map = HashMap::new();
614        Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
615
616        let mut stream_source_fragment_ids = HashSet::new();
617        let mut stream_source_backfill_fragment_ids = HashMap::new();
618        let mut no_shuffle_reschedule = HashMap::new();
619        for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
620            let fragment = fragment_map
621                .get(fragment_id)
622                .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
623
624            // Check if the rescheduling is supported.
625            match fragment_state[fragment_id] {
626                table_fragments::State::Unspecified => unreachable!(),
627                state @ table_fragments::State::Initial => {
628                    bail!(
629                        "the materialized view of fragment {fragment_id} is in state {}",
630                        state.as_str_name()
631                    )
632                }
633                state @ table_fragments::State::Creating => {
634                    let stream_node = &fragment.node;
635
636                    let mut is_reschedulable = true;
637                    visit_stream_node_cont(stream_node, |body| {
638                        if let Some(NodeBody::StreamScan(node)) = &body.node_body {
639                            if !node.stream_scan_type().is_reschedulable() {
640                                is_reschedulable = false;
641
642                                // fail fast
643                                return false;
644                            }
645
646                            // continue visiting
647                            return true;
648                        }
649
650                        // continue visiting
651                        true
652                    });
653
654                    if !is_reschedulable {
655                        bail!(
656                            "the materialized view of fragment {fragment_id} is in state {}",
657                            state.as_str_name()
658                        )
659                    }
660                }
661                table_fragments::State::Created => {}
662            }
663
664            if no_shuffle_target_fragment_ids.contains(fragment_id) {
665                bail!(
666                    "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
667                );
668            }
669
670            // For the relation of NoShuffle (e.g. Materialize and Chain), we need a special
671            // treatment because the upstream and downstream of NoShuffle are always 1-1
672            // correspondence, so we need to clone the reschedule plan to the downstream of all
673            // cascading relations.
674            if no_shuffle_source_fragment_ids.contains(fragment_id) {
675                // This fragment is a NoShuffle's upstream.
676                let mut queue: VecDeque<_> = fragment_dispatcher_map
677                    .get(fragment_id)
678                    .unwrap()
679                    .keys()
680                    .cloned()
681                    .collect();
682
683                while let Some(downstream_id) = queue.pop_front() {
684                    if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
685                        continue;
686                    }
687
688                    if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
689                    {
690                        let no_shuffle_downstreams = downstream_fragments
691                            .iter()
692                            .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
693                            .map(|(fragment_id, _)| fragment_id);
694
695                        queue.extend(no_shuffle_downstreams.copied());
696                    }
697
698                    no_shuffle_reschedule.insert(
699                        downstream_id,
700                        WorkerReschedule {
701                            worker_actor_diff: worker_actor_diff.clone(),
702                        },
703                    );
704                }
705            }
706
707            if fragment
708                .fragment_type_mask
709                .contains(FragmentTypeFlag::Source)
710                && fragment.node.find_stream_source().is_some()
711            {
712                stream_source_fragment_ids.insert(*fragment_id);
713            }
714
715            // Check if the reschedule plan is valid.
716            let current_worker_ids = fragment
717                .actors
718                .iter()
719                .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
720                .collect::<HashSet<_>>();
721
722            for (removed, change) in worker_actor_diff {
723                if !current_worker_ids.contains(removed) && change.is_negative() {
724                    bail!(
725                        "no actor on the worker {} of fragment {}",
726                        removed,
727                        fragment_id
728                    );
729                }
730            }
731
732            let added_actor_count: usize = worker_actor_diff
733                .values()
734                .filter(|change| change.is_positive())
735                .cloned()
736                .map(|change| change as usize)
737                .sum();
738
739            let removed_actor_count: usize = worker_actor_diff
740                .values()
741                .filter(|change| change.is_positive())
742                .cloned()
743                .map(|v| v.unsigned_abs())
744                .sum();
745
746            match fragment.distribution_type {
747                FragmentDistributionType::Hash => {
748                    if fragment.actors.len() + added_actor_count <= removed_actor_count {
749                        bail!("can't remove all actors from fragment {}", fragment_id);
750                    }
751                }
752                FragmentDistributionType::Single => {
753                    if added_actor_count != removed_actor_count {
754                        bail!("single distribution fragment only support migration");
755                    }
756                }
757                FragmentDistributionType::Unspecified => unreachable!(),
758            }
759        }
760
761        if !no_shuffle_reschedule.is_empty() {
762            tracing::info!(
763                "reschedule plan rewritten with NoShuffle reschedule {:?}",
764                no_shuffle_reschedule
765            );
766
767            for noshuffle_downstream in no_shuffle_reschedule.keys() {
768                let fragment = fragment_map.get(noshuffle_downstream).unwrap();
769                // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
770                if fragment
771                    .fragment_type_mask
772                    .contains(FragmentTypeFlag::SourceScan)
773                {
774                    let stream_node = &fragment.node;
775                    if let Some((_source_id, upstream_source_fragment_id)) =
776                        stream_node.find_source_backfill()
777                    {
778                        stream_source_backfill_fragment_ids
779                            .insert(fragment.fragment_id, upstream_source_fragment_id);
780                    }
781                }
782            }
783        }
784
785        // Modifications for NoShuffle downstream.
786        reschedule.extend(no_shuffle_reschedule.into_iter());
787
788        Ok(RescheduleContext {
789            actor_map,
790            actor_status,
791            fragment_map,
792            stream_source_fragment_ids,
793            stream_source_backfill_fragment_ids,
794            no_shuffle_target_fragment_ids,
795            no_shuffle_source_fragment_ids,
796            fragment_dispatcher_map,
797            fragment_upstreams: working_set.fragment_upstreams,
798        })
799    }
800
801    /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`].
802    ///
803    /// Returns `(reschedule_fragment, applied_reschedules)`
804    /// - `reschedule_fragment`: the generated reschedule plan
805    /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1).
806    ///
807    /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors`), we use the returned values to
808    /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling.
809    /// Meta store is updated after the barrier is collected.
810    ///
811    /// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta.
812    pub(crate) async fn analyze_reschedule_plan(
813        &self,
814        mut reschedules: HashMap<FragmentId, WorkerReschedule>,
815        options: RescheduleOptions,
816        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
817    ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
818        tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
819        let ctx = self
820            .build_reschedule_context(&mut reschedules, options, table_parallelisms)
821            .await?;
822        tracing::debug!("reschedule context: {:#?}", ctx);
823        let reschedules = reschedules;
824
825        // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.
826
827        // Index of actors to create/remove
828        // Fragment Id => ( Actor Id => Worker Id )
829        let (fragment_actors_to_remove, fragment_actors_to_create) =
830            self.arrange_reschedules(&reschedules, &ctx)?;
831
832        let mut fragment_actor_bitmap = HashMap::new();
833        for fragment_id in reschedules.keys() {
834            if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
835                // skipping chain fragment, we need to clone the upstream materialize fragment's
836                // mapping later
837                continue;
838            }
839
840            let actors_to_create = fragment_actors_to_create
841                .get(fragment_id)
842                .map(|map| map.keys().copied().collect())
843                .unwrap_or_default();
844
845            let actors_to_remove = fragment_actors_to_remove
846                .get(fragment_id)
847                .map(|map| map.keys().copied().collect())
848                .unwrap_or_default();
849
850            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
851
852            match fragment.distribution_type {
853                FragmentDistributionType::Single => {
854                    // Skip re-balancing action for single distribution (always None)
855                    fragment_actor_bitmap
856                        .insert(fragment.fragment_id as FragmentId, Default::default());
857                }
858                FragmentDistributionType::Hash => {
859                    let actor_vnode = rebalance_actor_vnode(
860                        &fragment.actors,
861                        &actors_to_remove,
862                        &actors_to_create,
863                    );
864
865                    fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
866                }
867
868                FragmentDistributionType::Unspecified => unreachable!(),
869            }
870        }
871
872        // Index for fragment -> { actor -> worker_id } after reschedule.
873        // Since we need to organize the upstream and downstream relationships of NoShuffle,
874        // we need to organize the actor distribution after a scaling.
875        let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
876        for fragment_id in reschedules.keys() {
877            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
878            let mut new_actor_ids = BTreeMap::new();
879            for actor in &fragment.actors {
880                if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
881                    && actors_to_remove.contains_key(&actor.actor_id)
882                {
883                    continue;
884                }
885                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
886                new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
887            }
888
889            if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
890                for (actor_id, worker_id) in actors_to_create {
891                    new_actor_ids.insert(*actor_id, *worker_id);
892                }
893            }
894
895            assert!(
896                !new_actor_ids.is_empty(),
897                "should be at least one actor in fragment {} after rescheduling",
898                fragment_id
899            );
900
901            fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
902        }
903
904        let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
905
906        // In order to maintain consistency with the original structure, the upstream and downstream
907        // actors of NoShuffle need to be in the same worker slot and hold the same virtual nodes,
908        // so for the actors after the upstream re-balancing, since we have sorted the actors of the same fragment by id on all workers,
909        // we can identify the corresponding upstream actor with NO_SHUFFLE.
910        // NOTE: There should be more asserts here to ensure correctness.
911        fn arrange_no_shuffle_relation(
912            ctx: &RescheduleContext,
913            fragment_id: &FragmentId,
914            upstream_fragment_id: &FragmentId,
915            fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
916            actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
917            fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
918            no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
919            no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
920        ) {
921            if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
922                return;
923            }
924
925            let fragment = &ctx.fragment_map[fragment_id];
926
927            let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
928
929            // build actor group map
930            for upstream_actor in &upstream_fragment.actors {
931                for dispatcher in &upstream_actor.dispatcher {
932                    if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
933                        let downstream_actor_id =
934                            *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
935
936                        // upstream is root
937                        if !ctx
938                            .no_shuffle_target_fragment_ids
939                            .contains(upstream_fragment_id)
940                        {
941                            actor_group_map.insert(
942                                upstream_actor.actor_id,
943                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
944                            );
945                            actor_group_map.insert(
946                                downstream_actor_id,
947                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
948                            );
949                        } else {
950                            let root_actor_id = actor_group_map[&upstream_actor.actor_id];
951
952                            actor_group_map.insert(downstream_actor_id, root_actor_id);
953                        }
954                    }
955                }
956            }
957
958            // If the upstream is a Singleton Fragment, there will be no Bitmap changes
959            let upstream_fragment_bitmap = fragment_updated_bitmap
960                .get(upstream_fragment_id)
961                .cloned()
962                .unwrap_or_default();
963
964            // Question: Is it possible to have Hash Distribution Fragment but the Actor's bitmap remains unchanged?
965            if upstream_fragment.distribution_type == FragmentDistributionType::Single {
966                assert!(
967                    upstream_fragment_bitmap.is_empty(),
968                    "single fragment should have no bitmap updates"
969                );
970            }
971
972            let upstream_fragment_actor_map = fragment_actors_after_reschedule
973                .get(upstream_fragment_id)
974                .cloned()
975                .unwrap();
976
977            let fragment_actor_map = fragment_actors_after_reschedule
978                .get(fragment_id)
979                .cloned()
980                .unwrap();
981
982            let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
983
984            // first, find existing actor bitmap, copy them
985            let mut fragment_bitmap = HashMap::new();
986
987            for (actor_id, worker_id) in &fragment_actor_map {
988                if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
989                    let root_bitmap = fragment_updated_bitmap
990                        .get(root_fragment)
991                        .expect("root fragment bitmap not found")
992                        .get(root_actor_id)
993                        .cloned()
994                        .expect("root actor bitmap not found");
995
996                    // Copy the bitmap
997                    fragment_bitmap.insert(*actor_id, root_bitmap);
998
999                    no_shuffle_upstream_actor_map
1000                        .entry(*actor_id as ActorId)
1001                        .or_default()
1002                        .insert(*upstream_fragment_id, *root_actor_id);
1003                    no_shuffle_downstream_actors_map
1004                        .entry(*root_actor_id)
1005                        .or_default()
1006                        .insert(*fragment_id, *actor_id);
1007                } else {
1008                    worker_reverse_index
1009                        .entry(*worker_id)
1010                        .or_default()
1011                        .insert(*actor_id);
1012                }
1013            }
1014
1015            let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1016
1017            for (actor_id, worker_id) in &upstream_fragment_actor_map {
1018                if !actor_group_map.contains_key(actor_id) {
1019                    upstream_worker_reverse_index
1020                        .entry(*worker_id)
1021                        .or_default()
1022                        .insert(*actor_id);
1023                }
1024            }
1025
1026            // then, find the rest of the actors and copy the bitmap
1027            for (worker_id, actor_ids) in worker_reverse_index {
1028                let upstream_actor_ids = upstream_worker_reverse_index
1029                    .get(&worker_id)
1030                    .unwrap()
1031                    .clone();
1032
1033                assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1034
1035                for (actor_id, upstream_actor_id) in actor_ids
1036                    .into_iter()
1037                    .zip_eq_debug(upstream_actor_ids.into_iter())
1038                {
1039                    match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1040                        None => {
1041                            // single fragment should have no bitmap updates (same as upstream)
1042                            assert_eq!(
1043                                upstream_fragment.distribution_type,
1044                                FragmentDistributionType::Single
1045                            );
1046                        }
1047                        Some(bitmap) => {
1048                            // Copy the bitmap
1049                            fragment_bitmap.insert(actor_id, bitmap);
1050                        }
1051                    }
1052
1053                    no_shuffle_upstream_actor_map
1054                        .entry(actor_id as ActorId)
1055                        .or_default()
1056                        .insert(*upstream_fragment_id, upstream_actor_id);
1057                    no_shuffle_downstream_actors_map
1058                        .entry(upstream_actor_id)
1059                        .or_default()
1060                        .insert(*fragment_id, actor_id);
1061                }
1062            }
1063
1064            match fragment.distribution_type {
1065                FragmentDistributionType::Hash => {}
1066                FragmentDistributionType::Single => {
1067                    // single distribution should update nothing
1068                    assert!(fragment_bitmap.is_empty());
1069                }
1070                FragmentDistributionType::Unspecified => unreachable!(),
1071            }
1072
1073            if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1074                assert_eq!(
1075                    e.entry.get(),
1076                    &e.value,
1077                    "bitmaps derived from different no-shuffle upstreams mismatch"
1078                );
1079            }
1080
1081            // Visit downstream fragments recursively.
1082            if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1083                let no_shuffle_downstreams = downstream_fragments
1084                    .iter()
1085                    .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1086                    .map(|(fragment_id, _)| fragment_id);
1087
1088                for downstream_fragment_id in no_shuffle_downstreams {
1089                    arrange_no_shuffle_relation(
1090                        ctx,
1091                        downstream_fragment_id,
1092                        fragment_id,
1093                        fragment_actors_after_reschedule,
1094                        actor_group_map,
1095                        fragment_updated_bitmap,
1096                        no_shuffle_upstream_actor_map,
1097                        no_shuffle_downstream_actors_map,
1098                    );
1099                }
1100            }
1101        }
1102
1103        let mut no_shuffle_upstream_actor_map = HashMap::new();
1104        let mut no_shuffle_downstream_actors_map = HashMap::new();
1105        let mut actor_group_map = HashMap::new();
1106        // For all roots in the upstream and downstream dependency trees of NoShuffle, recursively
1107        // find all correspondences
1108        for fragment_id in reschedules.keys() {
1109            if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1110                && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1111                && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1112            {
1113                for downstream_fragment_id in downstream_fragments.keys() {
1114                    arrange_no_shuffle_relation(
1115                        &ctx,
1116                        downstream_fragment_id,
1117                        fragment_id,
1118                        &fragment_actors_after_reschedule,
1119                        &mut actor_group_map,
1120                        &mut fragment_actor_bitmap,
1121                        &mut no_shuffle_upstream_actor_map,
1122                        &mut no_shuffle_downstream_actors_map,
1123                    );
1124                }
1125            }
1126        }
1127
1128        tracing::debug!("actor group map {:?}", actor_group_map);
1129
1130        let mut new_created_actors = HashMap::new();
1131        for fragment_id in reschedules.keys() {
1132            let actors_to_create = fragment_actors_to_create
1133                .get(fragment_id)
1134                .cloned()
1135                .unwrap_or_default();
1136
1137            let fragment = &ctx.fragment_map[fragment_id];
1138
1139            assert!(!fragment.actors.is_empty());
1140
1141            for actor_to_create in &actors_to_create {
1142                let new_actor_id = actor_to_create.0;
1143                let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1144
1145                // This should be assigned before the `modify_actor_upstream_and_downstream` call,
1146                // because we need to use the new actor id to find the upstream and
1147                // downstream in the NoShuffle relationship
1148                new_actor.actor_id = *new_actor_id;
1149
1150                Self::modify_actor_upstream_and_downstream(
1151                    &ctx,
1152                    &fragment_actors_to_remove,
1153                    &fragment_actors_to_create,
1154                    &fragment_actor_bitmap,
1155                    &no_shuffle_downstream_actors_map,
1156                    &mut new_actor,
1157                    &mut dispatchers,
1158                )?;
1159
1160                if let Some(bitmap) = fragment_actor_bitmap
1161                    .get(fragment_id)
1162                    .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1163                {
1164                    new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1165                }
1166
1167                new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1168            }
1169        }
1170
1171        // For stream source & source backfill fragments, we need to reallocate the splits.
1172        // Because we are in the Pause state, so it's no problem to reallocate
1173        let mut fragment_actor_splits = HashMap::new();
1174        for fragment_id in reschedules.keys() {
1175            let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1176
1177            if ctx.stream_source_fragment_ids.contains(fragment_id) {
1178                let fragment = &ctx.fragment_map[fragment_id];
1179
1180                let prev_actor_ids = fragment
1181                    .actors
1182                    .iter()
1183                    .map(|actor| actor.actor_id)
1184                    .collect_vec();
1185
1186                let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1187
1188                let actor_splits = self
1189                    .source_manager
1190                    .migrate_splits_for_source_actors(
1191                        *fragment_id,
1192                        &prev_actor_ids,
1193                        &curr_actor_ids,
1194                    )
1195                    .await?;
1196
1197                tracing::debug!(
1198                    "source actor splits: {:?}, fragment_id: {}",
1199                    actor_splits,
1200                    fragment_id
1201                );
1202                fragment_actor_splits.insert(*fragment_id, actor_splits);
1203            }
1204        }
1205        // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
1206        if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1207            for fragment_id in reschedules.keys() {
1208                let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1209
1210                if let Some(upstream_source_fragment_id) =
1211                    ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1212                {
1213                    let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1214
1215                    let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1216                        *fragment_id,
1217                        *upstream_source_fragment_id,
1218                        &curr_actor_ids,
1219                        &fragment_actor_splits,
1220                        &no_shuffle_upstream_actor_map,
1221                    )?;
1222                    tracing::debug!(
1223                        "source backfill actor splits: {:?}, fragment_id: {}",
1224                        actor_splits,
1225                        fragment_id
1226                    );
1227                    fragment_actor_splits.insert(*fragment_id, actor_splits);
1228                }
1229            }
1230        }
1231
1232        // Generate fragment reschedule plan
1233        let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1234            HashMap::with_capacity(reschedules.len());
1235
1236        for (fragment_id, _) in reschedules {
1237            let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1238
1239            if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1240                for (actor_id, worker_id) in actor_worker_maps {
1241                    actors_to_create
1242                        .entry(worker_id)
1243                        .or_default()
1244                        .push(actor_id);
1245                }
1246            }
1247
1248            let actors_to_remove = fragment_actors_to_remove
1249                .get(&fragment_id)
1250                .cloned()
1251                .unwrap_or_default()
1252                .into_keys()
1253                .collect();
1254
1255            let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1256
1257            assert!(!actors_after_reschedule.is_empty());
1258
1259            let fragment = &ctx.fragment_map[&fragment_id];
1260
1261            let in_degree_types: HashSet<_> = ctx
1262                .fragment_upstreams
1263                .get(&(fragment_id as _))
1264                .map(|upstreams| upstreams.values())
1265                .into_iter()
1266                .flatten()
1267                .cloned()
1268                .collect();
1269
1270            let upstream_dispatcher_mapping = match fragment.distribution_type {
1271                FragmentDistributionType::Hash => {
1272                    if !in_degree_types.contains(&DispatcherType::Hash) {
1273                        None
1274                    } else {
1275                        // Changes of the bitmap must occur in the case of HashDistribution
1276                        Some(ActorMapping::from_bitmaps(
1277                            &fragment_actor_bitmap[&fragment_id],
1278                        ))
1279                    }
1280                }
1281
1282                FragmentDistributionType::Single => {
1283                    assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1284                    None
1285                }
1286                FragmentDistributionType::Unspecified => unreachable!(),
1287            };
1288
1289            let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1290
1291            {
1292                if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1293                    for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1294                        match upstream_dispatcher_type {
1295                            DispatcherType::NoShuffle => {}
1296                            _ => {
1297                                upstream_fragment_dispatcher_set.insert((
1298                                    *upstream_fragment_id as FragmentId,
1299                                    fragment.fragment_id as DispatcherId,
1300                                ));
1301                            }
1302                        }
1303                    }
1304                }
1305            }
1306
1307            let downstream_fragment_ids = if let Some(downstream_fragments) =
1308                ctx.fragment_dispatcher_map.get(&fragment_id)
1309            {
1310                // Skip fragments' no-shuffle downstream, as there's no need to update the merger
1311                // (receiver) of a no-shuffle downstream
1312                downstream_fragments
1313                    .iter()
1314                    .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1315                    .map(|(fragment_id, _)| *fragment_id)
1316                    .collect_vec()
1317            } else {
1318                vec![]
1319            };
1320
1321            let vnode_bitmap_updates = match fragment.distribution_type {
1322                FragmentDistributionType::Hash => {
1323                    let mut vnode_bitmap_updates =
1324                        fragment_actor_bitmap.remove(&fragment_id).unwrap();
1325
1326                    // We need to keep the bitmaps from changed actors only,
1327                    // otherwise the barrier will become very large with many actors
1328                    for actor_id in actors_after_reschedule.keys() {
1329                        assert!(vnode_bitmap_updates.contains_key(actor_id));
1330
1331                        // retain actor
1332                        if let Some(actor) = ctx.actor_map.get(actor_id) {
1333                            let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1334
1335                            if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1336                                && prev_bitmap.eq(bitmap)
1337                            {
1338                                vnode_bitmap_updates.remove(actor_id);
1339                            }
1340                        }
1341                    }
1342
1343                    vnode_bitmap_updates
1344                }
1345                FragmentDistributionType::Single => HashMap::new(),
1346                FragmentDistributionType::Unspecified => unreachable!(),
1347            };
1348
1349            let upstream_fragment_dispatcher_ids =
1350                upstream_fragment_dispatcher_set.into_iter().collect_vec();
1351
1352            let actor_splits = fragment_actor_splits
1353                .get(&fragment_id)
1354                .cloned()
1355                .unwrap_or_default();
1356
1357            let cdc_table_snapshot_split_assignment = if fragment
1358                .fragment_type_mask
1359                .contains(FragmentTypeFlag::StreamCdcScan)
1360            {
1361                assign_cdc_table_snapshot_splits_impl(
1362                    fragment.job_id,
1363                    fragment_actors_after_reschedule
1364                        .get(&fragment_id)
1365                        .unwrap()
1366                        .keys()
1367                        .copied()
1368                        .collect(),
1369                    self.env.meta_store_ref(),
1370                )
1371                .await?
1372            } else {
1373                HashMap::default()
1374            };
1375
1376            reschedule_fragment.insert(
1377                fragment_id,
1378                Reschedule {
1379                    added_actors: actors_to_create,
1380                    removed_actors: actors_to_remove,
1381                    vnode_bitmap_updates,
1382                    upstream_fragment_dispatcher_ids,
1383                    upstream_dispatcher_mapping,
1384                    downstream_fragment_ids,
1385                    actor_splits,
1386                    newly_created_actors: Default::default(),
1387                    cdc_table_snapshot_split_assignment,
1388                },
1389            );
1390        }
1391
1392        let mut fragment_created_actors = HashMap::new();
1393        for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1394            let mut created_actors = HashMap::new();
1395            for (actor_id, worker_id) in actors_to_create {
1396                let actor = new_created_actors.get(actor_id).cloned().unwrap();
1397                created_actors.insert(*actor_id, (actor, *worker_id));
1398            }
1399
1400            fragment_created_actors.insert(*fragment_id, created_actors);
1401        }
1402
1403        for (fragment_id, to_create) in fragment_created_actors {
1404            let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1405            reschedule.newly_created_actors = to_create;
1406        }
1407        tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1408
1409        Ok(reschedule_fragment)
1410    }
1411
1412    #[expect(clippy::type_complexity)]
1413    fn arrange_reschedules(
1414        &self,
1415        reschedule: &HashMap<FragmentId, WorkerReschedule>,
1416        ctx: &RescheduleContext,
1417    ) -> MetaResult<(
1418        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1419        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1420    )> {
1421        let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1422        let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1423
1424        for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1425            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1426
1427            // Actor Id => Worker Id
1428            let mut actors_to_remove = BTreeMap::new();
1429            let mut actors_to_create = BTreeMap::new();
1430
1431            // NOTE(important): The value needs to be a BTreeSet to ensure that the actors on the worker are sorted in ascending order.
1432            let mut worker_to_actors = HashMap::new();
1433
1434            for actor in &fragment.actors {
1435                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1436                worker_to_actors
1437                    .entry(worker_id)
1438                    .or_insert(BTreeSet::new())
1439                    .insert(actor.actor_id as ActorId);
1440            }
1441
1442            let decreased_actor_count = worker_actor_diff
1443                .iter()
1444                .filter(|(_, change)| change.is_negative())
1445                .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1446
1447            for (worker_id, n) in decreased_actor_count {
1448                if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1449                    if actor_ids.len() < n {
1450                        bail!(
1451                            "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1452                            fragment_id,
1453                            worker_id,
1454                            actor_ids.len(),
1455                            n
1456                        );
1457                    }
1458
1459                    let removed_actors: Vec<_> = actor_ids
1460                        .iter()
1461                        .skip(actor_ids.len().saturating_sub(n))
1462                        .cloned()
1463                        .collect();
1464
1465                    for actor in removed_actors {
1466                        actors_to_remove.insert(actor, *worker_id);
1467                    }
1468                }
1469            }
1470
1471            let increased_actor_count = worker_actor_diff
1472                .iter()
1473                .filter(|(_, change)| change.is_positive());
1474
1475            for (worker, n) in increased_actor_count {
1476                for _ in 0..*n {
1477                    let id = self
1478                        .env
1479                        .id_gen_manager()
1480                        .generate_interval::<{ IdCategory::Actor }>(1)
1481                        as ActorId;
1482                    actors_to_create.insert(id, *worker);
1483                }
1484            }
1485
1486            if !actors_to_remove.is_empty() {
1487                fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1488            }
1489
1490            if !actors_to_create.is_empty() {
1491                fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1492            }
1493        }
1494
1495        // sanity checking
1496        for actors_to_remove in fragment_actors_to_remove.values() {
1497            for actor_id in actors_to_remove.keys() {
1498                let actor = ctx.actor_map.get(actor_id).unwrap();
1499                for dispatcher in &actor.dispatcher {
1500                    if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1501                        let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1502
1503                        let _should_exists = fragment_actors_to_remove
1504                            .get(&(dispatcher.dispatcher_id as FragmentId))
1505                            .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1506                            .get(downstream_actor_id)
1507                            .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1508                    }
1509                }
1510            }
1511        }
1512
1513        Ok((fragment_actors_to_remove, fragment_actors_to_create))
1514    }
1515
1516    /// Modifies the upstream and downstream actors of the new created actor according to the
1517    /// overall changes, and is used to handle cascading updates
1518    fn modify_actor_upstream_and_downstream(
1519        ctx: &RescheduleContext,
1520        fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1521        fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1522        fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1523        no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1524        new_actor: &mut StreamActor,
1525        dispatchers: &mut Vec<PbDispatcher>,
1526    ) -> MetaResult<()> {
1527        // Update downstream actor ids
1528        for dispatcher in dispatchers {
1529            let downstream_fragment_id = dispatcher
1530                .downstream_actor_id
1531                .iter()
1532                .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1533                .dedup()
1534                .exactly_one()
1535                .unwrap() as FragmentId;
1536
1537            let downstream_fragment_actors_to_remove =
1538                fragment_actors_to_remove.get(&downstream_fragment_id);
1539            let downstream_fragment_actors_to_create =
1540                fragment_actors_to_create.get(&downstream_fragment_id);
1541
1542            match dispatcher.r#type() {
1543                d @ (PbDispatcherType::Hash
1544                | PbDispatcherType::Simple
1545                | PbDispatcherType::Broadcast) => {
1546                    if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1547                    {
1548                        dispatcher
1549                            .downstream_actor_id
1550                            .retain(|id| !downstream_actors_to_remove.contains_key(id));
1551                    }
1552
1553                    if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1554                    {
1555                        dispatcher
1556                            .downstream_actor_id
1557                            .extend(downstream_actors_to_create.keys().cloned())
1558                    }
1559
1560                    // There should be still exactly one downstream actor
1561                    if d == PbDispatcherType::Simple {
1562                        assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1563                    }
1564                }
1565                PbDispatcherType::NoShuffle => {
1566                    assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1567                    let downstream_actor_id = no_shuffle_downstream_actors_map
1568                        .get(&new_actor.actor_id)
1569                        .and_then(|map| map.get(&downstream_fragment_id))
1570                        .unwrap();
1571                    dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1572                }
1573                PbDispatcherType::Unspecified => unreachable!(),
1574            }
1575
1576            if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1577                && let Some(downstream_updated_bitmap) =
1578                    fragment_actor_bitmap.get(&downstream_fragment_id)
1579            {
1580                // If downstream scale in/out
1581                *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1582            }
1583        }
1584
1585        Ok(())
1586    }
1587
1588    #[await_tree::instrument]
1589    pub async fn post_apply_reschedule(
1590        &self,
1591        reschedules: &HashMap<FragmentId, Reschedule>,
1592        post_updates: &JobReschedulePostUpdates,
1593    ) -> MetaResult<()> {
1594        // Update fragment info after rescheduling in meta store.
1595        self.metadata_manager
1596            .post_apply_reschedules(reschedules.clone(), post_updates)
1597            .await?;
1598
1599        // Update serving fragment info after rescheduling in meta store.
1600        if !reschedules.is_empty() {
1601            let workers = self
1602                .metadata_manager
1603                .list_active_serving_compute_nodes()
1604                .await?;
1605            let streaming_parallelisms = self
1606                .metadata_manager
1607                .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1608                .await?;
1609            let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1610            let max_serving_parallelism = self
1611                .env
1612                .session_params_manager_impl_ref()
1613                .get_params()
1614                .await
1615                .batch_parallelism()
1616                .map(|p| p.get());
1617            let (upserted, failed) = serving_worker_slot_mapping.upsert(
1618                streaming_parallelisms,
1619                &workers,
1620                max_serving_parallelism,
1621            );
1622            if !upserted.is_empty() {
1623                tracing::debug!(
1624                    "Update serving vnode mapping for fragments {:?}.",
1625                    upserted.keys()
1626                );
1627                self.env
1628                    .notification_manager()
1629                    .notify_frontend_without_version(
1630                        Operation::Update,
1631                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1632                            mappings: to_fragment_worker_slot_mapping(&upserted),
1633                        }),
1634                    );
1635            }
1636            if !failed.is_empty() {
1637                tracing::debug!(
1638                    "Fail to update serving vnode mapping for fragments {:?}.",
1639                    failed
1640                );
1641                self.env
1642                    .notification_manager()
1643                    .notify_frontend_without_version(
1644                        Operation::Delete,
1645                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1646                            mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1647                        }),
1648                    );
1649            }
1650        }
1651
1652        let mut stream_source_actor_splits = HashMap::new();
1653        let mut stream_source_dropped_actors = HashSet::new();
1654
1655        // todo: handle adaptive splits
1656        for (fragment_id, reschedule) in reschedules {
1657            if !reschedule.actor_splits.is_empty() {
1658                stream_source_actor_splits
1659                    .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1660                stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1661            }
1662        }
1663
1664        if !stream_source_actor_splits.is_empty() {
1665            self.source_manager
1666                .apply_source_change(SourceChange::Reschedule {
1667                    split_assignment: stream_source_actor_splits,
1668                    dropped_actors: stream_source_dropped_actors,
1669                })
1670                .await;
1671        }
1672
1673        Ok(())
1674    }
1675
1676    pub async fn generate_job_reschedule_plan(
1677        &self,
1678        policy: JobReschedulePolicy,
1679        generate_plan_for_cdc_table_backfill: bool,
1680    ) -> MetaResult<JobReschedulePlan> {
1681        type VnodeCount = usize;
1682
1683        let JobReschedulePolicy { targets } = policy;
1684
1685        let workers = self
1686            .metadata_manager
1687            .list_active_streaming_compute_nodes()
1688            .await?;
1689
1690        // The `schedulable` field should eventually be replaced by resource groups like `unschedulable`
1691        let workers: HashMap<_, _> = workers
1692            .into_iter()
1693            .filter(|worker| worker.is_streaming_schedulable())
1694            .map(|worker| (worker.id, worker))
1695            .collect();
1696
1697        #[derive(Debug)]
1698        struct JobUpdate {
1699            filtered_worker_ids: BTreeSet<WorkerId>,
1700            parallelism: TableParallelism,
1701        }
1702
1703        let mut job_parallelism_updates = HashMap::new();
1704
1705        let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1706            parallelism_updates: Default::default(),
1707            resource_group_updates: Default::default(),
1708        };
1709
1710        for (
1711            job_id,
1712            JobRescheduleTarget {
1713                parallelism: parallelism_update,
1714                resource_group: resource_group_update,
1715            },
1716        ) in &targets
1717        {
1718            let parallelism = match parallelism_update {
1719                JobParallelismTarget::Update(parallelism) => *parallelism,
1720                JobParallelismTarget::Refresh => {
1721                    let parallelism = self
1722                        .metadata_manager
1723                        .catalog_controller
1724                        .get_job_streaming_parallelisms(*job_id as _)
1725                        .await?;
1726
1727                    parallelism.into()
1728                }
1729            };
1730
1731            job_reschedule_post_updates
1732                .parallelism_updates
1733                .insert(TableId::from(*job_id), parallelism);
1734
1735            let current_resource_group = match resource_group_update {
1736                JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1737                    job_reschedule_post_updates.resource_group_updates.insert(
1738                        *job_id as ObjectId,
1739                        Some(specific_resource_group.to_owned()),
1740                    );
1741
1742                    specific_resource_group.to_owned()
1743                }
1744                JobResourceGroupTarget::Update(None) => {
1745                    let database_resource_group = self
1746                        .metadata_manager
1747                        .catalog_controller
1748                        .get_existing_job_database_resource_group(*job_id as _)
1749                        .await?;
1750
1751                    job_reschedule_post_updates
1752                        .resource_group_updates
1753                        .insert(*job_id as ObjectId, None);
1754                    database_resource_group
1755                }
1756                JobResourceGroupTarget::Keep => {
1757                    self.metadata_manager
1758                        .catalog_controller
1759                        .get_existing_job_resource_group(*job_id as _)
1760                        .await?
1761                }
1762            };
1763
1764            let filtered_worker_ids =
1765                filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1766
1767            if filtered_worker_ids.is_empty() {
1768                bail!("Cannot resize streaming_job {job_id} to empty worker set")
1769            }
1770
1771            job_parallelism_updates.insert(
1772                *job_id,
1773                JobUpdate {
1774                    filtered_worker_ids,
1775                    parallelism,
1776                },
1777            );
1778        }
1779
1780        // index for no shuffle relation
1781        let mut no_shuffle_source_fragment_ids = HashSet::new();
1782        let mut no_shuffle_target_fragment_ids = HashSet::new();
1783
1784        // index for fragment_id -> (distribution_type, vnode_count)
1785        let mut fragment_distribution_map = HashMap::new();
1786        // index for actor -> worker id
1787        let mut actor_location = HashMap::new();
1788        // index for table_id -> [fragment_id]
1789        let mut table_fragment_id_map = HashMap::new();
1790        // index for fragment_id -> [actor_id]
1791        let mut fragment_actor_id_map = HashMap::new();
1792
1793        async fn build_index(
1794            no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1795            no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1796            fragment_distribution_map: &mut HashMap<
1797                FragmentId,
1798                (FragmentDistributionType, VnodeCount, bool),
1799            >,
1800            actor_location: &mut HashMap<ActorId, WorkerId>,
1801            table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1802            fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1803            mgr: &MetadataManager,
1804            table_ids: Vec<ObjectId>,
1805            generate_plan_only_for_cdc_table_backfill: bool,
1806        ) -> Result<(), MetaError> {
1807            let RescheduleWorkingSet {
1808                fragments,
1809                actors,
1810                actor_dispatchers: _actor_dispatchers,
1811                fragment_downstreams,
1812                fragment_upstreams: _fragment_upstreams,
1813                related_jobs: _related_jobs,
1814                job_resource_groups: _job_resource_groups,
1815            } = mgr
1816                .catalog_controller
1817                .resolve_working_set_for_reschedule_tables(table_ids)
1818                .await?;
1819
1820            for (fragment_id, downstreams) in fragment_downstreams {
1821                for (downstream_fragment_id, dispatcher_type) in downstreams {
1822                    if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1823                        no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1824                        no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1825                    }
1826                }
1827            }
1828
1829            for (fragment_id, fragment) in fragments {
1830                let is_cdc_backfill_v2_fragment =
1831                    FragmentTypeMask::from(fragment.fragment_type_mask)
1832                        .contains(FragmentTypeFlag::StreamCdcScan);
1833                if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1834                    continue;
1835                }
1836                fragment_distribution_map.insert(
1837                    fragment_id as FragmentId,
1838                    (
1839                        FragmentDistributionType::from(fragment.distribution_type),
1840                        fragment.vnode_count as _,
1841                        is_cdc_backfill_v2_fragment,
1842                    ),
1843                );
1844
1845                table_fragment_id_map
1846                    .entry(fragment.job_id as u32)
1847                    .or_default()
1848                    .insert(fragment_id as FragmentId);
1849            }
1850
1851            for (actor_id, actor) in actors {
1852                actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1853                fragment_actor_id_map
1854                    .entry(actor.fragment_id as FragmentId)
1855                    .or_default()
1856                    .insert(actor_id as ActorId);
1857            }
1858
1859            Ok(())
1860        }
1861
1862        let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1863
1864        build_index(
1865            &mut no_shuffle_source_fragment_ids,
1866            &mut no_shuffle_target_fragment_ids,
1867            &mut fragment_distribution_map,
1868            &mut actor_location,
1869            &mut table_fragment_id_map,
1870            &mut fragment_actor_id_map,
1871            &self.metadata_manager,
1872            table_ids,
1873            generate_plan_for_cdc_table_backfill,
1874        )
1875        .await?;
1876        tracing::debug!(
1877            ?job_reschedule_post_updates,
1878            ?job_parallelism_updates,
1879            ?no_shuffle_source_fragment_ids,
1880            ?no_shuffle_target_fragment_ids,
1881            ?fragment_distribution_map,
1882            ?actor_location,
1883            ?table_fragment_id_map,
1884            ?fragment_actor_id_map,
1885            "generate_table_resize_plan, after build_index"
1886        );
1887
1888        let adaptive_parallelism_strategy = self
1889            .env
1890            .system_params_reader()
1891            .await
1892            .adaptive_parallelism_strategy();
1893
1894        let mut target_plan = HashMap::new();
1895
1896        for (
1897            table_id,
1898            JobUpdate {
1899                filtered_worker_ids,
1900                parallelism,
1901            },
1902        ) in job_parallelism_updates
1903        {
1904            let assigner = AssignerBuilder::new(table_id).build();
1905
1906            let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1907
1908            let available_worker_slots = workers
1909                .iter()
1910                .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1911                .map(|(_, worker)| {
1912                    (
1913                        worker.id as WorkerId,
1914                        NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1915                    )
1916                })
1917                .collect::<BTreeMap<_, _>>();
1918
1919            for fragment_id in fragment_map {
1920                // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream.
1921                if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1922                    continue;
1923                }
1924
1925                let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1926
1927                for actor_id in &fragment_actor_id_map[&fragment_id] {
1928                    let worker_id = actor_location[actor_id];
1929                    *fragment_slots.entry(worker_id).or_default() += 1;
1930                }
1931
1932                let available_slot_count: usize = available_worker_slots
1933                    .values()
1934                    .cloned()
1935                    .map(NonZeroUsize::get)
1936                    .sum();
1937
1938                if available_slot_count == 0 {
1939                    bail!(
1940                        "No schedulable slots available for fragment {}",
1941                        fragment_id
1942                    );
1943                }
1944
1945                let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1946                    fragment_distribution_map[&fragment_id];
1947                let max_parallelism = vnode_count;
1948                let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1949                    assert!(is_cdc_backfill_v2_fragment);
1950                    let TableParallelism::Fixed(new_parallelism) = parallelism else {
1951                        return Err(anyhow::anyhow!(
1952                            "invalid new parallelism {:?}, expect fixed parallelism",
1953                            parallelism
1954                        )
1955                        .into());
1956                    };
1957                    if new_parallelism > max_parallelism || new_parallelism == 0 {
1958                        return Err(anyhow::anyhow!(
1959                            "invalid new parallelism {}, max parallelism {}",
1960                            new_parallelism,
1961                            max_parallelism
1962                        )
1963                        .into());
1964                    }
1965                    TableParallelism::Fixed(new_parallelism)
1966                } else if is_cdc_backfill_v2_fragment {
1967                    TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1968                } else {
1969                    parallelism
1970                };
1971                match dist {
1972                    FragmentDistributionType::Unspecified => unreachable!(),
1973                    FragmentDistributionType::Single => {
1974                        let (single_worker_id, should_be_one) = fragment_slots
1975                            .iter()
1976                            .exactly_one()
1977                            .expect("single fragment should have only one worker slot");
1978
1979                        assert_eq!(*should_be_one, 1);
1980
1981                        let assignment =
1982                            assigner.count_actors_per_worker(&available_worker_slots, 1);
1983
1984                        let (chosen_target_worker_id, should_be_one) =
1985                            assignment.iter().exactly_one().ok().with_context(|| {
1986                                format!(
1987                                    "Cannot find a single target worker for fragment {fragment_id}"
1988                                )
1989                            })?;
1990
1991                        assert_eq!(*should_be_one, 1);
1992
1993                        if *chosen_target_worker_id == *single_worker_id {
1994                            tracing::debug!(
1995                                "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1996                            );
1997                            continue;
1998                        }
1999
2000                        target_plan.insert(
2001                            fragment_id,
2002                            WorkerReschedule {
2003                                worker_actor_diff: BTreeMap::from_iter(vec![
2004                                    (*chosen_target_worker_id, 1),
2005                                    (*single_worker_id, -1),
2006                                ]),
2007                            },
2008                        );
2009                    }
2010                    FragmentDistributionType::Hash => match fragment_parallelism_strategy {
2011                        TableParallelism::Adaptive => {
2012                            let target_slot_count = adaptive_parallelism_strategy
2013                                .compute_target_parallelism(available_slot_count);
2014
2015                            if target_slot_count > max_parallelism {
2016                                tracing::warn!(
2017                                    "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2018                                );
2019
2020                                let target_worker_slots = assigner.count_actors_per_worker(
2021                                    &available_worker_slots,
2022                                    max_parallelism,
2023                                );
2024
2025                                target_plan.insert(
2026                                    fragment_id,
2027                                    Self::diff_worker_slot_changes(
2028                                        &fragment_slots,
2029                                        &target_worker_slots,
2030                                    ),
2031                                );
2032                            } else if available_slot_count != target_slot_count {
2033                                tracing::info!(
2034                                    "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2035                                );
2036
2037                                let target_worker_slots = assigner.count_actors_per_worker(
2038                                    &available_worker_slots,
2039                                    target_slot_count,
2040                                );
2041
2042                                target_plan.insert(
2043                                    fragment_id,
2044                                    Self::diff_worker_slot_changes(
2045                                        &fragment_slots,
2046                                        &target_worker_slots,
2047                                    ),
2048                                );
2049                            } else {
2050                                let available_worker_slots = available_worker_slots
2051                                    .iter()
2052                                    .map(|(worker_id, v)| (*worker_id, v.get()))
2053                                    .collect();
2054
2055                                target_plan.insert(
2056                                    fragment_id,
2057                                    Self::diff_worker_slot_changes(
2058                                        &fragment_slots,
2059                                        &available_worker_slots,
2060                                    ),
2061                                );
2062                            }
2063                        }
2064                        TableParallelism::Fixed(mut n) => {
2065                            if n > max_parallelism {
2066                                tracing::warn!(
2067                                    "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2068                                );
2069                                n = max_parallelism
2070                            }
2071
2072                            let target_worker_slots =
2073                                assigner.count_actors_per_worker(&available_worker_slots, n);
2074
2075                            target_plan.insert(
2076                                fragment_id,
2077                                Self::diff_worker_slot_changes(
2078                                    &fragment_slots,
2079                                    &target_worker_slots,
2080                                ),
2081                            );
2082                        }
2083                        TableParallelism::Custom => {
2084                            // skipping for custom
2085                        }
2086                    },
2087                }
2088            }
2089        }
2090
2091        target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2092        tracing::debug!(
2093            ?target_plan,
2094            "generate_table_resize_plan finished target_plan"
2095        );
2096        if generate_plan_for_cdc_table_backfill {
2097            job_reschedule_post_updates.resource_group_updates = HashMap::default();
2098            job_reschedule_post_updates.parallelism_updates = HashMap::default();
2099        }
2100        Ok(JobReschedulePlan {
2101            reschedules: target_plan,
2102            post_updates: job_reschedule_post_updates,
2103        })
2104    }
2105
2106    fn diff_worker_slot_changes(
2107        fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2108        target_worker_slots: &BTreeMap<WorkerId, usize>,
2109    ) -> WorkerReschedule {
2110        let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2111        let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2112
2113        for (&worker_id, &target_slots) in target_worker_slots {
2114            let &current_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2115
2116            if target_slots > current_slots {
2117                increased_actor_count.insert(worker_id, target_slots - current_slots);
2118            }
2119        }
2120
2121        for (&worker_id, &current_slots) in fragment_worker_slots {
2122            let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2123
2124            if current_slots > target_slots {
2125                decreased_actor_count.insert(worker_id, current_slots - target_slots);
2126            }
2127        }
2128
2129        let worker_ids: HashSet<_> = increased_actor_count
2130            .keys()
2131            .chain(decreased_actor_count.keys())
2132            .cloned()
2133            .collect();
2134
2135        let mut worker_actor_diff = BTreeMap::new();
2136
2137        for worker_id in worker_ids {
2138            let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2139            let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2140            let change = increased - decreased;
2141
2142            assert_ne!(change, 0);
2143
2144            worker_actor_diff.insert(worker_id, change);
2145        }
2146
2147        WorkerReschedule { worker_actor_diff }
2148    }
2149
2150    fn build_no_shuffle_relation_index(
2151        actor_map: &HashMap<ActorId, CustomActorInfo>,
2152        no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2153        no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2154    ) {
2155        let mut fragment_cache = HashSet::new();
2156        for actor in actor_map.values() {
2157            if fragment_cache.contains(&actor.fragment_id) {
2158                continue;
2159            }
2160
2161            for dispatcher in &actor.dispatcher {
2162                for downstream_actor_id in &dispatcher.downstream_actor_id {
2163                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2164                        // Checking for no shuffle dispatchers
2165                        if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2166                            no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2167                            no_shuffle_target_fragment_ids
2168                                .insert(downstream_actor.fragment_id as FragmentId);
2169                        }
2170                    }
2171                }
2172            }
2173
2174            fragment_cache.insert(actor.fragment_id);
2175        }
2176    }
2177
2178    fn build_fragment_dispatcher_index(
2179        actor_map: &HashMap<ActorId, CustomActorInfo>,
2180        fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2181    ) {
2182        for actor in actor_map.values() {
2183            for dispatcher in &actor.dispatcher {
2184                for downstream_actor_id in &dispatcher.downstream_actor_id {
2185                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2186                        fragment_dispatcher_map
2187                            .entry(actor.fragment_id as FragmentId)
2188                            .or_default()
2189                            .insert(
2190                                downstream_actor.fragment_id as FragmentId,
2191                                dispatcher.r#type().into(),
2192                            );
2193                    }
2194                }
2195            }
2196        }
2197    }
2198
2199    pub fn resolve_no_shuffle_upstream_tables(
2200        fragment_ids: HashSet<FragmentId>,
2201        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2202        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2203        fragment_to_table: &HashMap<FragmentId, TableId>,
2204        fragment_upstreams: &HashMap<
2205            risingwave_meta_model::FragmentId,
2206            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2207        >,
2208        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2209    ) -> MetaResult<()> {
2210        let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2211
2212        let mut fragment_ids = fragment_ids;
2213
2214        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2215        // for every no_shuffle relation.
2216        while let Some(fragment_id) = queue.pop_front() {
2217            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2218                continue;
2219            }
2220
2221            // for upstream
2222            for upstream_fragment_id in fragment_upstreams
2223                .get(&(fragment_id as _))
2224                .map(|upstreams| upstreams.keys())
2225                .into_iter()
2226                .flatten()
2227            {
2228                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2229                let upstream_fragment_id = &upstream_fragment_id;
2230                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2231                    continue;
2232                }
2233
2234                let table_id = &fragment_to_table[&fragment_id];
2235                let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2236
2237                // Only custom parallelism will be propagated to the no shuffle upstream.
2238                if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2239                    if let Some(upstream_table_parallelism) =
2240                        table_parallelisms.get(upstream_table_id)
2241                    {
2242                        if upstream_table_parallelism != &TableParallelism::Custom {
2243                            bail!(
2244                                "Cannot change upstream table {} from {:?} to {:?}",
2245                                upstream_table_id,
2246                                upstream_table_parallelism,
2247                                TableParallelism::Custom
2248                            )
2249                        }
2250                    } else {
2251                        table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2252                    }
2253                }
2254
2255                fragment_ids.insert(*upstream_fragment_id);
2256                queue.push_back(*upstream_fragment_id);
2257            }
2258        }
2259
2260        let downstream_fragment_ids = fragment_ids
2261            .iter()
2262            .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2263
2264        let downstream_table_ids = downstream_fragment_ids
2265            .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2266            .collect::<HashSet<_>>();
2267
2268        table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2269
2270        Ok(())
2271    }
2272
2273    pub fn resolve_no_shuffle_upstream_fragments<T>(
2274        reschedule: &mut HashMap<FragmentId, T>,
2275        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2276        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2277        fragment_upstreams: &HashMap<
2278            risingwave_meta_model::FragmentId,
2279            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2280        >,
2281    ) -> MetaResult<()>
2282    where
2283        T: Clone + Eq,
2284    {
2285        let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2286
2287        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2288        // for every no_shuffle relation.
2289        while let Some(fragment_id) = queue.pop_front() {
2290            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2291                continue;
2292            }
2293
2294            // for upstream
2295            for upstream_fragment_id in fragment_upstreams
2296                .get(&(fragment_id as _))
2297                .map(|upstreams| upstreams.keys())
2298                .into_iter()
2299                .flatten()
2300            {
2301                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2302                let upstream_fragment_id = &upstream_fragment_id;
2303                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2304                    continue;
2305                }
2306
2307                let reschedule_plan = &reschedule[&fragment_id];
2308
2309                if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2310                    if upstream_reschedule_plan != reschedule_plan {
2311                        bail!(
2312                            "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2313                            fragment_id,
2314                            upstream_fragment_id
2315                        );
2316                    }
2317
2318                    continue;
2319                }
2320
2321                reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2322
2323                queue.push_back(*upstream_fragment_id);
2324            }
2325        }
2326
2327        reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2328
2329        Ok(())
2330    }
2331
2332    pub async fn resolve_related_no_shuffle_jobs(
2333        &self,
2334        jobs: &[TableId],
2335    ) -> MetaResult<HashSet<TableId>> {
2336        let RescheduleWorkingSet { related_jobs, .. } = self
2337            .metadata_manager
2338            .catalog_controller
2339            .resolve_working_set_for_reschedule_tables(
2340                jobs.iter().map(|id| id.table_id as _).collect(),
2341            )
2342            .await?;
2343
2344        Ok(related_jobs
2345            .keys()
2346            .map(|id| TableId::new(*id as _))
2347            .collect())
2348    }
2349}
2350
2351#[derive(Debug, Clone)]
2352pub enum JobParallelismTarget {
2353    Update(TableParallelism),
2354    Refresh,
2355}
2356
2357#[derive(Debug, Clone)]
2358pub enum JobResourceGroupTarget {
2359    Update(Option<String>),
2360    Keep,
2361}
2362
2363#[derive(Debug, Clone)]
2364pub struct JobRescheduleTarget {
2365    pub parallelism: JobParallelismTarget,
2366    pub resource_group: JobResourceGroupTarget,
2367}
2368
2369#[derive(Debug)]
2370pub struct JobReschedulePolicy {
2371    pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2372}
2373
2374// final updates for `post_collect`
2375#[derive(Debug, Clone)]
2376pub struct JobReschedulePostUpdates {
2377    pub parallelism_updates: HashMap<TableId, TableParallelism>,
2378    pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2379}
2380
2381#[derive(Debug)]
2382pub struct JobReschedulePlan {
2383    pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2384    pub post_updates: JobReschedulePostUpdates,
2385}
2386
2387impl GlobalStreamManager {
2388    #[await_tree::instrument("acquire_reschedule_read_guard")]
2389    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2390        self.scale_controller.reschedule_lock.read().await
2391    }
2392
2393    #[await_tree::instrument("acquire_reschedule_write_guard")]
2394    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2395        self.scale_controller.reschedule_lock.write().await
2396    }
2397
2398    /// The entrypoint of rescheduling actors.
2399    ///
2400    /// Used by:
2401    /// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService` (`risectl meta reschedule`)
2402    /// - High-level parallelism control API
2403    ///     * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM`
2404    ///     * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed
2405    pub async fn reschedule_actors(
2406        &self,
2407        database_id: DatabaseId,
2408        plan: JobReschedulePlan,
2409        options: RescheduleOptions,
2410    ) -> MetaResult<()> {
2411        let JobReschedulePlan {
2412            reschedules,
2413            mut post_updates,
2414        } = plan;
2415
2416        let reschedule_fragment = self
2417            .scale_controller
2418            .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2419            .await?;
2420
2421        tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2422
2423        let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2424            .iter()
2425            .flat_map(|(_, reschedule)| {
2426                reschedule
2427                    .upstream_fragment_dispatcher_ids
2428                    .iter()
2429                    .map(|(fragment_id, _)| *fragment_id)
2430                    .chain(reschedule.downstream_fragment_ids.iter().cloned())
2431            })
2432            .collect();
2433
2434        let fragment_actors =
2435            try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2436                let actor_ids = self
2437                    .metadata_manager
2438                    .get_running_actors_of_fragment(*fragment_id)
2439                    .await?;
2440                Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2441            }))
2442            .await?
2443            .into_iter()
2444            .collect();
2445
2446        let command = Command::RescheduleFragment {
2447            reschedules: reschedule_fragment,
2448            fragment_actors,
2449            post_updates,
2450        };
2451
2452        let _guard = self.source_manager.pause_tick().await;
2453
2454        self.barrier_scheduler
2455            .run_command(database_id, command)
2456            .await?;
2457
2458        tracing::info!("reschedule done");
2459
2460        Ok(())
2461    }
2462
2463    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
2464    /// examines if there are any jobs can be scaled, and scales them if found.
2465    ///
2466    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
2467    ///
2468    /// Returns
2469    /// - `Ok(false)` if no jobs can be scaled;
2470    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
2471    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2472        tracing::info!("trigger parallelism control");
2473
2474        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2475
2476        let background_streaming_jobs = self
2477            .metadata_manager
2478            .list_background_creating_jobs()
2479            .await?;
2480
2481        let skipped_jobs = if !background_streaming_jobs.is_empty() {
2482            let jobs = self
2483                .scale_controller
2484                .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2485                .await?;
2486
2487            tracing::info!(
2488                "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2489                background_streaming_jobs,
2490                jobs
2491            );
2492
2493            jobs
2494        } else {
2495            HashSet::new()
2496        };
2497
2498        let job_ids: HashSet<_> = {
2499            let streaming_parallelisms = self
2500                .metadata_manager
2501                .catalog_controller
2502                .get_all_streaming_parallelisms()
2503                .await?;
2504
2505            streaming_parallelisms
2506                .into_iter()
2507                .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2508                .map(|(table_id, _)| table_id)
2509                .collect()
2510        };
2511
2512        let workers = self
2513            .metadata_manager
2514            .cluster_controller
2515            .list_active_streaming_workers()
2516            .await?;
2517
2518        let schedulable_worker_ids: BTreeSet<_> = workers
2519            .iter()
2520            .filter(|worker| {
2521                !worker
2522                    .property
2523                    .as_ref()
2524                    .map(|p| p.is_unschedulable)
2525                    .unwrap_or(false)
2526            })
2527            .map(|worker| worker.id as WorkerId)
2528            .collect();
2529
2530        if job_ids.is_empty() {
2531            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2532            return Ok(false);
2533        }
2534
2535        let batch_size = match self.env.opts.parallelism_control_batch_size {
2536            0 => job_ids.len(),
2537            n => n,
2538        };
2539
2540        tracing::info!(
2541            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2542            job_ids.len(),
2543            batch_size,
2544            schedulable_worker_ids
2545        );
2546
2547        let batches: Vec<_> = job_ids
2548            .into_iter()
2549            .chunks(batch_size)
2550            .into_iter()
2551            .map(|chunk| chunk.collect_vec())
2552            .collect();
2553
2554        let mut reschedules = None;
2555
2556        for batch in batches {
2557            let targets: HashMap<_, _> = batch
2558                .into_iter()
2559                .map(|job_id| {
2560                    (
2561                        job_id as u32,
2562                        JobRescheduleTarget {
2563                            parallelism: JobParallelismTarget::Refresh,
2564                            resource_group: JobResourceGroupTarget::Keep,
2565                        },
2566                    )
2567                })
2568                .collect();
2569
2570            let plan = self
2571                .scale_controller
2572                .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2573                .await?;
2574
2575            if !plan.reschedules.is_empty() {
2576                tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2577                reschedules = Some(plan);
2578                break;
2579            }
2580        }
2581
2582        let Some(plan) = reschedules else {
2583            tracing::info!("no reschedule plan generated");
2584            return Ok(false);
2585        };
2586
2587        // todo
2588        for (database_id, reschedules) in self
2589            .metadata_manager
2590            .split_fragment_map_by_database(plan.reschedules)
2591            .await?
2592        {
2593            self.reschedule_actors(
2594                database_id,
2595                JobReschedulePlan {
2596                    reschedules,
2597                    post_updates: plan.post_updates.clone(),
2598                },
2599                RescheduleOptions {
2600                    resolve_no_shuffle_upstream: false,
2601                    skip_create_new_actors: false,
2602                },
2603            )
2604            .await?;
2605        }
2606
2607        Ok(true)
2608    }
2609
2610    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
2611    async fn run(&self, mut shutdown_rx: Receiver<()>) {
2612        tracing::info!("starting automatic parallelism control monitor");
2613
2614        let check_period =
2615            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2616
2617        let mut ticker = tokio::time::interval_at(
2618            Instant::now()
2619                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2620            check_period,
2621        );
2622        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2623
2624        // waiting for the first tick
2625        ticker.tick().await;
2626
2627        let (local_notification_tx, mut local_notification_rx) =
2628            tokio::sync::mpsc::unbounded_channel();
2629
2630        self.env
2631            .notification_manager()
2632            .insert_local_sender(local_notification_tx);
2633
2634        let worker_nodes = self
2635            .metadata_manager
2636            .list_active_streaming_compute_nodes()
2637            .await
2638            .expect("list active streaming compute nodes");
2639
2640        let mut worker_cache: BTreeMap<_, _> = worker_nodes
2641            .into_iter()
2642            .map(|worker| (worker.id, worker))
2643            .collect();
2644
2645        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2646
2647        let mut should_trigger = false;
2648
2649        loop {
2650            tokio::select! {
2651                biased;
2652
2653                _ = &mut shutdown_rx => {
2654                    tracing::info!("Stream manager is stopped");
2655                    break;
2656                }
2657
2658                _ = ticker.tick(), if should_trigger => {
2659                    let include_workers = worker_cache.keys().copied().collect_vec();
2660
2661                    if include_workers.is_empty() {
2662                        tracing::debug!("no available worker nodes");
2663                        should_trigger = false;
2664                        continue;
2665                    }
2666
2667                    match self.trigger_parallelism_control().await {
2668                        Ok(cont) => {
2669                            should_trigger = cont;
2670                        }
2671                        Err(e) => {
2672                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2673                            ticker.reset();
2674                        }
2675                    }
2676                }
2677
2678                notification = local_notification_rx.recv() => {
2679                    let notification = notification.expect("local notification channel closed in loop of stream manager");
2680
2681                    // Only maintain the cache for streaming compute nodes.
2682                    let worker_is_streaming_compute = |worker: &WorkerNode| {
2683                        worker.get_type() == Ok(WorkerType::ComputeNode)
2684                            && worker.property.as_ref().unwrap().is_streaming
2685                    };
2686
2687                    match notification {
2688                        LocalNotification::SystemParamsChange(reader) => {
2689                            let new_strategy = reader.adaptive_parallelism_strategy();
2690                            if new_strategy != previous_adaptive_parallelism_strategy {
2691                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2692                                should_trigger = true;
2693                                previous_adaptive_parallelism_strategy = new_strategy;
2694                            }
2695                        }
2696                        LocalNotification::WorkerNodeActivated(worker) => {
2697                            if !worker_is_streaming_compute(&worker) {
2698                                continue;
2699                            }
2700
2701                            tracing::info!(worker = worker.id, "worker activated notification received");
2702
2703                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
2704
2705                            match prev_worker {
2706                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
2707                                    tracing::info!(worker = worker.id, "worker parallelism changed");
2708                                    should_trigger = true;
2709                                }
2710                                Some(prev_worker) if  prev_worker.resource_group() != worker.resource_group()  => {
2711                                    tracing::info!(worker = worker.id, "worker label changed");
2712                                    should_trigger = true;
2713                                }
2714                                None => {
2715                                    tracing::info!(worker = worker.id, "new worker joined");
2716                                    should_trigger = true;
2717                                }
2718                                _ => {}
2719                            }
2720                        }
2721
2722                        // Since our logic for handling passive scale-in is within the barrier manager,
2723                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
2724                        LocalNotification::WorkerNodeDeleted(worker) => {
2725                            if !worker_is_streaming_compute(&worker) {
2726                                continue;
2727                            }
2728
2729                            match worker_cache.remove(&worker.id) {
2730                                Some(prev_worker) => {
2731                                    tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2732                                }
2733                                None => {
2734                                    tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2735                                }
2736                            }
2737                        }
2738
2739                        _ => {}
2740                    }
2741                }
2742            }
2743        }
2744    }
2745
2746    pub fn start_auto_parallelism_monitor(
2747        self: Arc<Self>,
2748    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2749        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2750        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2751        let join_handle = tokio::spawn(async move {
2752            self.run(shutdown_rx).await;
2753        });
2754
2755        (join_handle, shutdown_tx)
2756    }
2757}