risingwave_meta/stream/
scale.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::future;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::hash::ActorMapping;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_meta_model::{
27    StreamingParallelism, WorkerId, fragment, fragment_relation, object, streaming_job,
28};
29use risingwave_pb::common::{WorkerNode, WorkerType};
30use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher};
31use sea_orm::{ConnectionTrait, JoinType, QuerySelect, RelationTrait};
32use thiserror_ext::AsReport;
33use tokio::sync::oneshot::Receiver;
34use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
35use tokio::task::JoinHandle;
36use tokio::time::{Instant, MissedTickBehavior};
37
38use crate::barrier::{Command, Reschedule, RescheduleContext, ReschedulePlan};
39use crate::controller::scale::{
40    FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble,
41    find_fragment_no_shuffle_dags_detailed, load_fragment_context, load_fragment_context_for_jobs,
42};
43use crate::error::bail_invalid_parameter;
44use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager};
45use crate::model::{ActorId, FragmentId, StreamActor, StreamActorWithDispatchers};
46use crate::stream::{GlobalStreamManager, SourceManagerRef};
47use crate::{MetaError, MetaResult};
48
49#[derive(Debug, Clone, Eq, PartialEq)]
50pub struct WorkerReschedule {
51    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
52}
53
54use risingwave_common::id::JobId;
55use risingwave_common::system_param::AdaptiveParallelismStrategy;
56use risingwave_meta_model::DispatcherType;
57use risingwave_meta_model::fragment::DistributionType;
58use risingwave_meta_model::prelude::{Fragment, FragmentRelation, StreamingJob};
59use sea_orm::ActiveValue::Set;
60use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait};
61
62use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
63use crate::controller::utils::{
64    StreamingJobExtraInfo, compose_dispatchers, get_streaming_job_extra_info,
65};
66
67pub type ScaleControllerRef = Arc<ScaleController>;
68
69pub struct ScaleController {
70    pub metadata_manager: MetadataManager,
71
72    pub source_manager: SourceManagerRef,
73
74    pub env: MetaSrvEnv,
75
76    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
77    /// e.g., a MV cannot be rescheduled during foreground backfill.
78    pub reschedule_lock: RwLock<()>,
79}
80
81impl ScaleController {
82    pub fn new(
83        metadata_manager: &MetadataManager,
84        source_manager: SourceManagerRef,
85        env: MetaSrvEnv,
86    ) -> Self {
87        Self {
88            metadata_manager: metadata_manager.clone(),
89            source_manager,
90            env,
91            reschedule_lock: RwLock::new(()),
92        }
93    }
94
95    pub async fn resolve_related_no_shuffle_jobs(
96        &self,
97        jobs: &[JobId],
98    ) -> MetaResult<HashSet<JobId>> {
99        let inner = self.metadata_manager.catalog_controller.inner.read().await;
100        let txn = inner.db.begin().await?;
101
102        let fragment_ids: Vec<_> = Fragment::find()
103            .select_only()
104            .column(fragment::Column::FragmentId)
105            .filter(fragment::Column::JobId.is_in(jobs.to_vec()))
106            .into_tuple()
107            .all(&txn)
108            .await?;
109        let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
110        let related_fragments = ensembles
111            .iter()
112            .flat_map(|ensemble| ensemble.fragments())
113            .collect_vec();
114
115        let job_ids: Vec<_> = Fragment::find()
116            .select_only()
117            .column(fragment::Column::JobId)
118            .filter(fragment::Column::FragmentId.is_in(related_fragments))
119            .into_tuple()
120            .all(&txn)
121            .await?;
122
123        let job_ids = job_ids.into_iter().collect();
124
125        Ok(job_ids)
126    }
127
128    pub async fn reschedule_inplace(
129        &self,
130        policy: HashMap<JobId, ReschedulePolicy>,
131    ) -> MetaResult<HashMap<DatabaseId, Command>> {
132        let inner = self.metadata_manager.catalog_controller.inner.read().await;
133        let txn = inner.db.begin().await?;
134
135        for (table_id, target) in &policy {
136            let streaming_job = StreamingJob::find_by_id(*table_id)
137                .one(&txn)
138                .await?
139                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
140
141            let max_parallelism = streaming_job.max_parallelism;
142
143            let mut streaming_job = streaming_job.into_active_model();
144
145            match &target {
146                ReschedulePolicy::Parallelism(p) | ReschedulePolicy::Both(p, _) => {
147                    if let StreamingParallelism::Fixed(n) = p.parallelism
148                        && n > max_parallelism as usize
149                    {
150                        bail!(format!(
151                            "specified parallelism {n} should not exceed max parallelism {max_parallelism}"
152                        ));
153                    }
154
155                    streaming_job.parallelism = Set(p.parallelism.clone());
156                }
157                _ => {}
158            }
159
160            match &target {
161                ReschedulePolicy::ResourceGroup(r) | ReschedulePolicy::Both(_, r) => {
162                    streaming_job.specific_resource_group = Set(r.resource_group.clone());
163                }
164                _ => {}
165            }
166
167            StreamingJob::update(streaming_job).exec(&txn).await?;
168        }
169
170        let job_ids: HashSet<JobId> = policy.keys().copied().collect();
171        let commands = build_reschedule_intent_for_jobs(&txn, job_ids).await?;
172
173        txn.commit().await?;
174
175        Ok(commands)
176    }
177
178    pub async fn reschedule_fragment_inplace(
179        &self,
180        policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
181    ) -> MetaResult<HashMap<DatabaseId, Command>> {
182        if policy.is_empty() {
183            return Ok(HashMap::new());
184        }
185
186        let inner = self.metadata_manager.catalog_controller.inner.read().await;
187        let txn = inner.db.begin().await?;
188
189        let fragment_id_list = policy.keys().copied().collect_vec();
190
191        let existing_fragment_ids: HashSet<_> = Fragment::find()
192            .select_only()
193            .column(fragment::Column::FragmentId)
194            .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
195            .into_tuple::<risingwave_meta_model::FragmentId>()
196            .all(&txn)
197            .await?
198            .into_iter()
199            .collect();
200
201        if let Some(missing_fragment_id) = fragment_id_list
202            .iter()
203            .find(|fragment_id| !existing_fragment_ids.contains(*fragment_id))
204        {
205            return Err(MetaError::catalog_id_not_found(
206                "fragment",
207                *missing_fragment_id,
208            ));
209        }
210
211        let mut target_ensembles = vec![];
212
213        for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
214            let entry_fragment_ids = ensemble.entry_fragments().collect_vec();
215
216            let desired_parallelism = match entry_fragment_ids
217                .iter()
218                .filter_map(|fragment_id| policy.get(fragment_id).cloned())
219                .dedup()
220                .collect_vec()
221                .as_slice()
222            {
223                [] => {
224                    bail_invalid_parameter!(
225                        "none of the entry fragments {:?} were included in the reschedule request; \
226                         provide at least one entry fragment id",
227                        entry_fragment_ids
228                    );
229                }
230                [parallelism] => parallelism.clone(),
231                parallelisms => {
232                    bail!(
233                        "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
234                        parallelisms
235                    );
236                }
237            };
238
239            let fragments = Fragment::find()
240                .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
241                .all(&txn)
242                .await?;
243
244            debug_assert!(
245                fragments
246                    .iter()
247                    .map(|fragment| fragment.parallelism.as_ref())
248                    .all_equal(),
249                "entry fragments in the same ensemble should share the same parallelism"
250            );
251
252            let current_parallelism = fragments
253                .first()
254                .and_then(|fragment| fragment.parallelism.clone());
255
256            if current_parallelism == desired_parallelism {
257                continue;
258            }
259
260            for fragment in fragments {
261                let mut fragment = fragment.into_active_model();
262                fragment.parallelism = Set(desired_parallelism.clone());
263                Fragment::update(fragment).exec(&txn).await?;
264            }
265
266            target_ensembles.push(ensemble);
267        }
268
269        if target_ensembles.is_empty() {
270            txn.commit().await?;
271            return Ok(HashMap::new());
272        }
273
274        let target_fragment_ids: HashSet<FragmentId> = target_ensembles
275            .iter()
276            .flat_map(|ensemble| ensemble.component_fragments())
277            .collect();
278        let commands = build_reschedule_intent_for_fragments(&txn, target_fragment_ids).await?;
279
280        txn.commit().await?;
281
282        Ok(commands)
283    }
284
285    async fn rerender(&self, jobs: HashSet<JobId>) -> MetaResult<HashMap<DatabaseId, Command>> {
286        if jobs.is_empty() {
287            return Ok(HashMap::new());
288        }
289
290        let inner = self.metadata_manager.catalog_controller.inner.read().await;
291        let txn = inner.db.begin().await?;
292        let commands = build_reschedule_intent_for_jobs(&txn, jobs).await?;
293        txn.commit().await?;
294        Ok(commands)
295    }
296}
297
298async fn build_reschedule_intent_for_jobs(
299    txn: &impl ConnectionTrait,
300    job_ids: HashSet<JobId>,
301) -> MetaResult<HashMap<DatabaseId, Command>> {
302    if job_ids.is_empty() {
303        return Ok(HashMap::new());
304    }
305
306    let job_id_list = job_ids.iter().copied().collect_vec();
307    let database_jobs: Vec<(DatabaseId, JobId)> = StreamingJob::find()
308        .select_only()
309        .column(object::Column::DatabaseId)
310        .column(streaming_job::Column::JobId)
311        .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
312        .filter(streaming_job::Column::JobId.is_in(job_id_list.clone()))
313        .into_tuple()
314        .all(txn)
315        .await?;
316
317    if database_jobs.len() != job_ids.len() {
318        let returned_jobs: HashSet<JobId> =
319            database_jobs.iter().map(|(_, job_id)| *job_id).collect();
320        let missing = job_ids.difference(&returned_jobs).copied().collect_vec();
321        return Err(MetaError::catalog_id_not_found(
322            "streaming job",
323            format!("{missing:?}"),
324        ));
325    }
326
327    let reschedule_context = load_reschedule_context_for_jobs(txn, job_ids).await?;
328    if reschedule_context.is_empty() {
329        return Ok(HashMap::new());
330    }
331
332    let commands = reschedule_context
333        .into_database_contexts()
334        .into_iter()
335        .map(|(database_id, context)| {
336            (
337                database_id,
338                Command::RescheduleIntent {
339                    context,
340                    reschedule_plan: None,
341                },
342            )
343        })
344        .collect();
345
346    Ok(commands)
347}
348
349async fn build_reschedule_intent_for_fragments(
350    txn: &impl ConnectionTrait,
351    fragment_ids: HashSet<FragmentId>,
352) -> MetaResult<HashMap<DatabaseId, Command>> {
353    if fragment_ids.is_empty() {
354        return Ok(HashMap::new());
355    }
356
357    let fragment_id_list = fragment_ids.iter().copied().collect_vec();
358    let fragment_databases: Vec<(FragmentId, DatabaseId)> = Fragment::find()
359        .select_only()
360        .column(fragment::Column::FragmentId)
361        .column(object::Column::DatabaseId)
362        .join(JoinType::LeftJoin, fragment::Relation::Object.def())
363        .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
364        .into_tuple()
365        .all(txn)
366        .await?;
367
368    if fragment_databases.len() != fragment_ids.len() {
369        let returned: HashSet<FragmentId> = fragment_databases
370            .iter()
371            .map(|(fragment_id, _)| *fragment_id)
372            .collect();
373        let missing = fragment_ids.difference(&returned).copied().collect_vec();
374        return Err(MetaError::catalog_id_not_found(
375            "fragment",
376            format!("{missing:?}"),
377        ));
378    }
379
380    let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_id_list).await?;
381    let reschedule_context = load_reschedule_context_for_ensembles(txn, ensembles).await?;
382    if reschedule_context.is_empty() {
383        return Ok(HashMap::new());
384    }
385
386    let commands = reschedule_context
387        .into_database_contexts()
388        .into_iter()
389        .map(|(database_id, context)| {
390            (
391                database_id,
392                Command::RescheduleIntent {
393                    context,
394                    reschedule_plan: None,
395                },
396            )
397        })
398        .collect();
399
400    Ok(commands)
401}
402
403async fn load_reschedule_context_for_jobs(
404    txn: &impl ConnectionTrait,
405    job_ids: HashSet<JobId>,
406) -> MetaResult<RescheduleContext> {
407    let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
408    build_reschedule_context_from_loaded(txn, loaded).await
409}
410
411async fn load_reschedule_context_for_ensembles(
412    txn: &impl ConnectionTrait,
413    ensembles: Vec<NoShuffleEnsemble>,
414) -> MetaResult<RescheduleContext> {
415    let loaded = load_fragment_context(txn, ensembles).await?;
416    build_reschedule_context_from_loaded(txn, loaded).await
417}
418
419async fn build_reschedule_context_from_loaded(
420    txn: &impl ConnectionTrait,
421    loaded: LoadedFragmentContext,
422) -> MetaResult<RescheduleContext> {
423    if loaded.is_empty() {
424        return Ok(RescheduleContext::empty());
425    }
426
427    let job_ids = loaded.job_map.keys().copied().collect_vec();
428    let job_extra_info = get_streaming_job_extra_info(txn, job_ids).await?;
429
430    let fragment_ids = loaded
431        .job_fragments
432        .values()
433        .flat_map(|fragments| fragments.keys().copied())
434        .collect_vec();
435
436    let upstreams: Vec<(FragmentId, FragmentId, DispatcherType)> = FragmentRelation::find()
437        .select_only()
438        .columns([
439            fragment_relation::Column::TargetFragmentId,
440            fragment_relation::Column::SourceFragmentId,
441            fragment_relation::Column::DispatcherType,
442        ])
443        .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
444        .into_tuple()
445        .all(txn)
446        .await?;
447
448    let mut upstream_fragments = HashMap::new();
449    for (fragment, upstream, dispatcher) in upstreams {
450        upstream_fragments
451            .entry(fragment as FragmentId)
452            .or_insert(HashMap::new())
453            .insert(upstream as FragmentId, dispatcher);
454    }
455
456    let downstreams = FragmentRelation::find()
457        .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids.clone()))
458        .all(txn)
459        .await?;
460
461    let mut downstream_fragments = HashMap::new();
462    let mut downstream_relations = HashMap::new();
463    for relation in downstreams {
464        let source_fragment_id = relation.source_fragment_id as FragmentId;
465        let target_fragment_id = relation.target_fragment_id as FragmentId;
466        downstream_fragments
467            .entry(source_fragment_id)
468            .or_insert(HashMap::new())
469            .insert(target_fragment_id, relation.dispatcher_type);
470        downstream_relations.insert((source_fragment_id, target_fragment_id), relation);
471    }
472
473    Ok(RescheduleContext {
474        loaded,
475        job_extra_info,
476        upstream_fragments,
477        downstream_fragments,
478        downstream_relations,
479    })
480}
481
482/// Build a `Reschedule` by diffing the previously materialized fragment state against
483/// the newly rendered actor layout.
484///
485/// This function assumes a full rebuild (no kept actors) and produces:
486/// - actor additions/removals and vnode bitmap updates
487/// - dispatcher updates for upstream/downstream fragments
488/// - updated split assignments for source actors
489///
490/// `upstream_fragments`/`downstream_fragments` describe neighbor fragments and dispatcher types,
491/// while `all_actor_dispatchers` contains the new dispatcher list for each actor. `job_extra_info`
492/// supplies job-level context for building new actors.
493fn diff_fragment(
494    prev_fragment_info: &InflightFragmentInfo,
495    curr_actors: &HashMap<ActorId, InflightActorInfo>,
496    upstream_fragments: HashMap<FragmentId, DispatcherType>,
497    downstream_fragments: HashMap<FragmentId, DispatcherType>,
498    all_actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
499    job_extra_info: Option<&StreamingJobExtraInfo>,
500) -> MetaResult<Reschedule> {
501    let prev_ids: HashSet<_> = prev_fragment_info.actors.keys().cloned().collect();
502    let curr_ids: HashSet<_> = curr_actors.keys().cloned().collect();
503
504    let removed_actors: HashSet<_> = &prev_ids - &curr_ids;
505    let added_actor_ids: HashSet<_> = &curr_ids - &prev_ids;
506    let kept_ids: HashSet<_> = prev_ids.intersection(&curr_ids).cloned().collect();
507    debug_assert!(
508        kept_ids.is_empty(),
509        "kept actors found in scale; expected full rebuild, prev={prev_ids:?}, curr={curr_ids:?}, kept={kept_ids:?}"
510    );
511
512    let mut added_actors = HashMap::new();
513    for &actor_id in &added_actor_ids {
514        let InflightActorInfo { worker_id, .. } = curr_actors
515            .get(&actor_id)
516            .ok_or_else(|| anyhow!("BUG: Worker not found for new actor {}", actor_id))?;
517
518        added_actors
519            .entry(*worker_id)
520            .or_insert_with(Vec::new)
521            .push(actor_id);
522    }
523
524    let mut vnode_bitmap_updates = HashMap::new();
525    for actor_id in kept_ids {
526        let prev_actor = &prev_fragment_info.actors[&actor_id];
527        let curr_actor = &curr_actors[&actor_id];
528
529        // Check if the vnode distribution has changed.
530        if prev_actor.vnode_bitmap != curr_actor.vnode_bitmap
531            && let Some(bitmap) = curr_actor.vnode_bitmap.clone()
532        {
533            vnode_bitmap_updates.insert(actor_id, bitmap);
534        }
535    }
536
537    let upstream_dispatcher_mapping =
538        if let DistributionType::Hash = prev_fragment_info.distribution_type {
539            let actor_mapping = curr_actors
540                .iter()
541                .map(
542                    |(
543                        actor_id,
544                        InflightActorInfo {
545                            worker_id: _,
546                            vnode_bitmap,
547                            ..
548                        },
549                    )| { (*actor_id, vnode_bitmap.clone().unwrap()) },
550                )
551                .collect();
552            Some(ActorMapping::from_bitmaps(&actor_mapping))
553        } else {
554            None
555        };
556
557    let upstream_fragment_dispatcher_ids = upstream_fragments
558        .iter()
559        .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
560        .map(|(upstream_fragment, _)| (*upstream_fragment, prev_fragment_info.fragment_id))
561        .collect();
562
563    let downstream_fragment_ids = downstream_fragments
564        .iter()
565        .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
566        .map(|(fragment_id, _)| *fragment_id)
567        .collect();
568
569    let extra_info = job_extra_info.cloned().unwrap_or_default();
570    let expr_context = extra_info.stream_context().to_expr_context();
571    let job_definition = extra_info.job_definition;
572    let config_override = extra_info.config_override;
573
574    let newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)> =
575        added_actor_ids
576            .iter()
577            .map(|actor_id| {
578                let actor = StreamActor {
579                    actor_id: *actor_id,
580                    fragment_id: prev_fragment_info.fragment_id,
581                    vnode_bitmap: curr_actors[actor_id].vnode_bitmap.clone(),
582                    mview_definition: job_definition.clone(),
583                    expr_context: Some(expr_context.clone()),
584                    config_override: config_override.clone(),
585                };
586                (
587                    *actor_id,
588                    (
589                        (
590                            actor,
591                            all_actor_dispatchers
592                                .get(actor_id)
593                                .cloned()
594                                .unwrap_or_default(),
595                        ),
596                        curr_actors[actor_id].worker_id,
597                    ),
598                )
599            })
600            .collect();
601
602    let actor_splits = curr_actors
603        .iter()
604        .map(|(&actor_id, info)| (actor_id, info.splits.clone()))
605        .collect();
606
607    let reschedule = Reschedule {
608        added_actors,
609        removed_actors,
610        vnode_bitmap_updates,
611        upstream_fragment_dispatcher_ids,
612        upstream_dispatcher_mapping,
613        downstream_fragment_ids,
614        actor_splits,
615        newly_created_actors,
616    };
617
618    Ok(reschedule)
619}
620
621pub(crate) fn build_reschedule_commands(
622    render_result: FragmentRenderMap,
623    context: RescheduleContext,
624    all_prev_fragments: HashMap<FragmentId, &InflightFragmentInfo>,
625) -> MetaResult<HashMap<DatabaseId, ReschedulePlan>> {
626    if render_result.is_empty() {
627        return Ok(HashMap::new());
628    }
629
630    let RescheduleContext {
631        job_extra_info,
632        upstream_fragments: mut all_upstream_fragments,
633        downstream_fragments: mut all_downstream_fragments,
634        mut downstream_relations,
635        ..
636    } = context;
637
638    let fragment_ids = render_result
639        .values()
640        .flat_map(|jobs| jobs.values())
641        .flatten()
642        .map(|(fragment_id, _)| *fragment_id)
643        .collect_vec();
644
645    let all_related_fragment_ids: HashSet<_> = fragment_ids
646        .iter()
647        .copied()
648        .chain(all_upstream_fragments.values().flatten().map(|(id, _)| *id))
649        .chain(
650            all_downstream_fragments
651                .values()
652                .flatten()
653                .map(|(id, _)| *id),
654        )
655        .collect();
656
657    for fragment_id in all_related_fragment_ids {
658        if !all_prev_fragments.contains_key(&fragment_id) {
659            return Err(MetaError::from(anyhow!(
660                "previous fragment info for {fragment_id} not found"
661            )));
662        }
663    }
664
665    let all_rendered_fragments: HashMap<_, _> = render_result
666        .values()
667        .flat_map(|jobs| jobs.values())
668        .flatten()
669        .map(|(fragment_id, info)| (*fragment_id, info))
670        .collect();
671
672    let mut commands = HashMap::new();
673
674    for (database_id, jobs) in &render_result {
675        let mut all_fragment_actors = HashMap::new();
676        let mut reschedules = HashMap::new();
677
678        for (job_id, fragment_id, fragment_info) in jobs.iter().flat_map(|(job_id, fragments)| {
679            fragments
680                .iter()
681                .map(move |(fragment_id, info)| (job_id, fragment_id, info))
682        }) {
683            let InflightFragmentInfo {
684                distribution_type,
685                actors,
686                ..
687            } = fragment_info;
688
689            let upstream_fragments = all_upstream_fragments
690                .remove(&(*fragment_id as FragmentId))
691                .unwrap_or_default();
692            let downstream_fragments = all_downstream_fragments
693                .remove(&(*fragment_id as FragmentId))
694                .unwrap_or_default();
695
696            let fragment_actors: HashMap<_, _> = upstream_fragments
697                .keys()
698                .copied()
699                .chain(downstream_fragments.keys().copied())
700                .map(|fragment_id| {
701                    all_prev_fragments
702                        .get(&fragment_id)
703                        .map(|fragment| {
704                            (
705                                fragment_id,
706                                fragment.actors.keys().copied().collect::<HashSet<_>>(),
707                            )
708                        })
709                        .ok_or_else(|| {
710                            MetaError::from(anyhow!(
711                                "fragment {} not found in previous state",
712                                fragment_id
713                            ))
714                        })
715                })
716                .collect::<MetaResult<_>>()?;
717
718            all_fragment_actors.extend(fragment_actors);
719
720            let source_fragment_actors = actors
721                .iter()
722                .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
723                .collect();
724
725            let mut all_actor_dispatchers: HashMap<_, Vec<_>> = HashMap::new();
726
727            for downstream_fragment_id in downstream_fragments.keys() {
728                let target_fragment_actors =
729                    match all_rendered_fragments.get(downstream_fragment_id) {
730                        None => {
731                            let external_fragment = all_prev_fragments
732                                .get(downstream_fragment_id)
733                                .ok_or_else(|| {
734                                    MetaError::from(anyhow!(
735                                        "fragment {} not found in previous state",
736                                        downstream_fragment_id
737                                    ))
738                                })?;
739
740                            external_fragment
741                                .actors
742                                .iter()
743                                .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
744                                .collect()
745                        }
746                        Some(downstream_rendered) => downstream_rendered
747                            .actors
748                            .iter()
749                            .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
750                            .collect(),
751                    };
752
753                let target_fragment_distribution = *distribution_type;
754
755                let fragment_relation::Model {
756                    source_fragment_id: _,
757                    target_fragment_id: _,
758                    dispatcher_type,
759                    dist_key_indices,
760                    output_indices,
761                    output_type_mapping,
762                } = downstream_relations
763                    .remove(&(
764                        *fragment_id as FragmentId,
765                        *downstream_fragment_id as FragmentId,
766                    ))
767                    .ok_or_else(|| {
768                        MetaError::from(anyhow!(
769                            "downstream relation missing for {} -> {}",
770                            fragment_id,
771                            downstream_fragment_id
772                        ))
773                    })?;
774
775                let pb_mapping = PbDispatchOutputMapping {
776                    indices: output_indices.into_u32_array(),
777                    types: output_type_mapping.unwrap_or_default().to_protobuf(),
778                };
779
780                let dispatchers = compose_dispatchers(
781                    *distribution_type,
782                    &source_fragment_actors,
783                    *downstream_fragment_id,
784                    target_fragment_distribution,
785                    &target_fragment_actors,
786                    dispatcher_type,
787                    dist_key_indices.into_u32_array(),
788                    pb_mapping,
789                );
790
791                for (actor_id, dispatcher) in dispatchers {
792                    all_actor_dispatchers
793                        .entry(actor_id)
794                        .or_default()
795                        .push(dispatcher);
796                }
797            }
798
799            let prev_fragment = all_prev_fragments.get(&{ *fragment_id }).ok_or_else(|| {
800                MetaError::from(anyhow!(
801                    "fragment {} not found in previous state",
802                    fragment_id
803                ))
804            })?;
805
806            let reschedule = diff_fragment(
807                prev_fragment,
808                actors,
809                upstream_fragments,
810                downstream_fragments,
811                all_actor_dispatchers,
812                job_extra_info.get(job_id),
813            )?;
814
815            reschedules.insert(*fragment_id as FragmentId, reschedule);
816        }
817
818        let command = ReschedulePlan {
819            reschedules,
820            fragment_actors: all_fragment_actors,
821        };
822
823        debug_assert!(
824            command
825                .reschedules
826                .values()
827                .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
828            "reschedule plan carries vnode_bitmap_updates, expected full rebuild"
829        );
830
831        commands.insert(*database_id, command);
832    }
833
834    Ok(commands)
835}
836
837#[derive(Clone, Debug, Eq, PartialEq)]
838pub struct ParallelismPolicy {
839    pub parallelism: StreamingParallelism,
840}
841
842#[derive(Clone, Debug)]
843pub struct ResourceGroupPolicy {
844    pub resource_group: Option<String>,
845}
846
847#[derive(Clone, Debug)]
848pub enum ReschedulePolicy {
849    Parallelism(ParallelismPolicy),
850    ResourceGroup(ResourceGroupPolicy),
851    Both(ParallelismPolicy, ResourceGroupPolicy),
852}
853
854impl GlobalStreamManager {
855    #[await_tree::instrument("acquire_reschedule_read_guard")]
856    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
857        self.scale_controller.reschedule_lock.read().await
858    }
859
860    #[await_tree::instrument("acquire_reschedule_write_guard")]
861    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
862        self.scale_controller.reschedule_lock.write().await
863    }
864
865    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
866    /// examines if there are any jobs can be scaled, and scales them if found.
867    ///
868    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
869    ///
870    /// Returns
871    /// - `Ok(false)` if no jobs can be scaled;
872    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
873    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
874        tracing::info!("trigger parallelism control");
875
876        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
877
878        let background_streaming_jobs = self
879            .metadata_manager
880            .list_background_creating_jobs()
881            .await?;
882
883        let unreschedulable_jobs = self
884            .metadata_manager
885            .collect_unreschedulable_backfill_jobs(&background_streaming_jobs, true)
886            .await?;
887
888        let database_objects: HashMap<risingwave_meta_model::DatabaseId, Vec<JobId>> = self
889            .metadata_manager
890            .catalog_controller
891            .list_streaming_job_with_database()
892            .await?;
893
894        let job_ids = database_objects
895            .iter()
896            .flat_map(|(database_id, job_ids)| {
897                job_ids
898                    .iter()
899                    .enumerate()
900                    .map(move |(idx, job_id)| (idx, database_id, job_id))
901            })
902            .sorted_by(|(idx_a, database_a, _), (idx_b, database_b, _)| {
903                idx_a.cmp(idx_b).then(database_a.cmp(database_b))
904            })
905            .map(|(_, database_id, job_id)| (*database_id, *job_id))
906            .filter(|(_, job_id)| !unreschedulable_jobs.contains(job_id))
907            .collect_vec();
908
909        if job_ids.is_empty() {
910            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
911            return Ok(false);
912        }
913
914        let active_workers =
915            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
916
917        if job_ids.is_empty() {
918            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
919            return Ok(false);
920        }
921
922        tracing::info!(
923            "trigger parallelism control for jobs: {:#?}, workers {:#?}",
924            job_ids,
925            active_workers.current()
926        );
927
928        let batch_size = match self.env.opts.parallelism_control_batch_size {
929            0 => job_ids.len(),
930            n => n,
931        };
932
933        tracing::info!(
934            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
935            job_ids.len(),
936            batch_size,
937            active_workers.current()
938        );
939
940        let batches: Vec<_> = job_ids
941            .into_iter()
942            .chunks(batch_size)
943            .into_iter()
944            .map(|chunk| chunk.collect_vec())
945            .collect();
946
947        for batch in batches {
948            let jobs = batch.iter().map(|(_, job_id)| *job_id).collect();
949
950            let commands = self.scale_controller.rerender(jobs).await?;
951
952            let futures = commands.into_iter().map(|(database_id, command)| {
953                let barrier_scheduler = self.barrier_scheduler.clone();
954                async move { barrier_scheduler.run_command(database_id, command).await }
955            });
956
957            let _results = future::try_join_all(futures).await?;
958        }
959
960        Ok(false)
961    }
962
963    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
964    async fn run(&self, mut shutdown_rx: Receiver<()>) {
965        tracing::info!("starting automatic parallelism control monitor");
966
967        let check_period =
968            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
969
970        let mut ticker = tokio::time::interval_at(
971            Instant::now()
972                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
973            check_period,
974        );
975        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
976
977        let (local_notification_tx, mut local_notification_rx) =
978            tokio::sync::mpsc::unbounded_channel();
979
980        self.env
981            .notification_manager()
982            .insert_local_sender(local_notification_tx);
983
984        // waiting for the first tick
985        ticker.tick().await;
986
987        let worker_nodes = self
988            .metadata_manager
989            .list_active_streaming_compute_nodes()
990            .await
991            .expect("list active streaming compute nodes");
992
993        let mut worker_cache: BTreeMap<_, _> = worker_nodes
994            .into_iter()
995            .map(|worker| (worker.id, worker))
996            .collect();
997
998        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
999
1000        let mut should_trigger = false;
1001
1002        loop {
1003            tokio::select! {
1004                biased;
1005
1006                _ = &mut shutdown_rx => {
1007                    tracing::info!("Stream manager is stopped");
1008                    break;
1009                }
1010
1011                _ = ticker.tick(), if should_trigger => {
1012                    let include_workers = worker_cache.keys().copied().collect_vec();
1013
1014                    if include_workers.is_empty() {
1015                        tracing::debug!("no available worker nodes");
1016                        should_trigger = false;
1017                        continue;
1018                    }
1019
1020                    match self.trigger_parallelism_control().await {
1021                        Ok(cont) => {
1022                            should_trigger = cont;
1023                        }
1024                        Err(e) => {
1025                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
1026                            ticker.reset();
1027                        }
1028                    }
1029                }
1030
1031                notification = local_notification_rx.recv() => {
1032                    let notification = notification.expect("local notification channel closed in loop of stream manager");
1033
1034                    // Only maintain the cache for streaming compute nodes.
1035                    let worker_is_streaming_compute = |worker: &WorkerNode| {
1036                        worker.get_type() == Ok(WorkerType::ComputeNode)
1037                            && worker.property.as_ref().unwrap().is_streaming
1038                    };
1039
1040                    match notification {
1041                        LocalNotification::SystemParamsChange(reader) => {
1042                            let new_strategy = reader.adaptive_parallelism_strategy();
1043                            if new_strategy != previous_adaptive_parallelism_strategy {
1044                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
1045                                should_trigger = true;
1046                                previous_adaptive_parallelism_strategy = new_strategy;
1047                            }
1048                        }
1049                        LocalNotification::WorkerNodeActivated(worker) => {
1050                            if !worker_is_streaming_compute(&worker) {
1051                                continue;
1052                            }
1053
1054                            tracing::info!(worker = %worker.id, "worker activated notification received");
1055
1056                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
1057
1058                            match prev_worker {
1059                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
1060                                    tracing::info!(worker = %worker.id, "worker parallelism changed");
1061                                    should_trigger = true;
1062                                }
1063                                Some(prev_worker) if prev_worker.resource_group() != worker.resource_group()  => {
1064                                    tracing::info!(worker = %worker.id, "worker label changed");
1065                                    should_trigger = true;
1066                                }
1067                                None => {
1068                                    tracing::info!(worker = %worker.id, "new worker joined");
1069                                    should_trigger = true;
1070                                }
1071                                _ => {}
1072                            }
1073                        }
1074
1075                        // Since our logic for handling passive scale-in is within the barrier manager,
1076                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
1077                        LocalNotification::WorkerNodeDeleted(worker) => {
1078                            if !worker_is_streaming_compute(&worker) {
1079                                continue;
1080                            }
1081
1082                            match worker_cache.remove(&worker.id) {
1083                                Some(prev_worker) => {
1084                                    tracing::info!(worker = %prev_worker.id, "worker removed from stream manager cache");
1085                                }
1086                                None => {
1087                                    tracing::warn!(worker = %worker.id, "worker not found in stream manager cache, but it was removed");
1088                                }
1089                            }
1090                        }
1091
1092                        LocalNotification::StreamingJobBackfillFinished(job_id) => {
1093                            tracing::debug!(job_id = %job_id, "received backfill finished notification");
1094                            if let Err(e) = self.apply_post_backfill_parallelism(job_id).await {
1095                                tracing::warn!(job_id = %job_id, error = %e.as_report(), "failed to restore parallelism after backfill");
1096                            }
1097                        }
1098
1099                        _ => {}
1100                    }
1101                }
1102            }
1103        }
1104    }
1105
1106    /// Restores a streaming job's parallelism to its target value after backfill completes.
1107    async fn apply_post_backfill_parallelism(&self, job_id: JobId) -> MetaResult<()> {
1108        // Fetch both the target parallelism (final desired state) and the backfill parallelism
1109        // (temporary parallelism used during backfill phase) from the catalog.
1110        let (target, backfill_parallelism) = self
1111            .metadata_manager
1112            .catalog_controller
1113            .get_job_parallelisms(job_id)
1114            .await?;
1115
1116        // Determine if we need to reschedule based on the backfill configuration.
1117        match backfill_parallelism {
1118            Some(backfill_parallelism) if backfill_parallelism == target => {
1119                // Backfill parallelism matches target - no reschedule needed since the job
1120                // is already running at the desired parallelism.
1121                tracing::debug!(
1122                    job_id = %job_id,
1123                    ?backfill_parallelism,
1124                    ?target,
1125                    "backfill parallelism equals job parallelism, skip reschedule"
1126                );
1127                return Ok(());
1128            }
1129            Some(_) => {
1130                // Backfill parallelism differs from target - proceed to restore target parallelism.
1131            }
1132            None => {
1133                // No backfill parallelism was configured, meaning the job was created without
1134                // a special backfill override. No reschedule is necessary.
1135                tracing::debug!(
1136                    job_id = %job_id,
1137                    ?target,
1138                    "no backfill parallelism configured, skip post-backfill reschedule"
1139                );
1140                return Ok(());
1141            }
1142        }
1143
1144        // Reschedule the job to restore its target parallelism.
1145        tracing::info!(
1146            job_id = %job_id,
1147            ?target,
1148            ?backfill_parallelism,
1149            "restoring parallelism after backfill via reschedule"
1150        );
1151        let policy = ReschedulePolicy::Parallelism(ParallelismPolicy {
1152            parallelism: target,
1153        });
1154        if let Err(e) = self.reschedule_streaming_job(job_id, policy, false).await {
1155            tracing::warn!(job_id = %job_id, error = %e.as_report(), "reschedule after backfill failed");
1156            return Err(e);
1157        }
1158
1159        tracing::info!(job_id = %job_id, "parallelism reschedule after backfill submitted");
1160        Ok(())
1161    }
1162
1163    pub fn start_auto_parallelism_monitor(
1164        self: Arc<Self>,
1165    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
1166        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
1167        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1168        let join_handle = tokio::spawn(async move {
1169            self.run(shutdown_rx).await;
1170        });
1171
1172        (join_handle, shutdown_tx)
1173    }
1174}