1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::future;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::hash::ActorMapping;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_meta_model::{
27 StreamingParallelism, WorkerId, fragment, fragment_relation, object, streaming_job,
28};
29use risingwave_pb::common::{WorkerNode, WorkerType};
30use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher};
31use sea_orm::{ConnectionTrait, JoinType, QuerySelect, RelationTrait};
32use thiserror_ext::AsReport;
33use tokio::sync::oneshot::Receiver;
34use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
35use tokio::task::JoinHandle;
36use tokio::time::{Instant, MissedTickBehavior};
37
38use crate::barrier::{Command, Reschedule, RescheduleContext, ReschedulePlan};
39use crate::controller::scale::{
40 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble,
41 find_fragment_no_shuffle_dags_detailed, load_fragment_context, load_fragment_context_for_jobs,
42};
43use crate::error::bail_invalid_parameter;
44use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager};
45use crate::model::{ActorId, FragmentId, StreamActor, StreamActorWithDispatchers};
46use crate::stream::{GlobalStreamManager, SourceManagerRef};
47use crate::{MetaError, MetaResult};
48
49#[derive(Debug, Clone, Eq, PartialEq)]
50pub struct WorkerReschedule {
51 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
52}
53
54use risingwave_common::id::JobId;
55use risingwave_common::system_param::AdaptiveParallelismStrategy;
56use risingwave_meta_model::DispatcherType;
57use risingwave_meta_model::fragment::DistributionType;
58use risingwave_meta_model::prelude::{Fragment, FragmentRelation, StreamingJob};
59use sea_orm::ActiveValue::Set;
60use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait};
61
62use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
63use crate::controller::utils::{
64 StreamingJobExtraInfo, compose_dispatchers, get_streaming_job_extra_info,
65};
66
67pub type ScaleControllerRef = Arc<ScaleController>;
68
69pub struct ScaleController {
70 pub metadata_manager: MetadataManager,
71
72 pub source_manager: SourceManagerRef,
73
74 pub env: MetaSrvEnv,
75
76 pub reschedule_lock: RwLock<()>,
79}
80
81impl ScaleController {
82 pub fn new(
83 metadata_manager: &MetadataManager,
84 source_manager: SourceManagerRef,
85 env: MetaSrvEnv,
86 ) -> Self {
87 Self {
88 metadata_manager: metadata_manager.clone(),
89 source_manager,
90 env,
91 reschedule_lock: RwLock::new(()),
92 }
93 }
94
95 pub async fn resolve_related_no_shuffle_jobs(
96 &self,
97 jobs: &[JobId],
98 ) -> MetaResult<HashSet<JobId>> {
99 let inner = self.metadata_manager.catalog_controller.inner.read().await;
100 let txn = inner.db.begin().await?;
101
102 let fragment_ids: Vec<_> = Fragment::find()
103 .select_only()
104 .column(fragment::Column::FragmentId)
105 .filter(fragment::Column::JobId.is_in(jobs.to_vec()))
106 .into_tuple()
107 .all(&txn)
108 .await?;
109 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
110 let related_fragments = ensembles
111 .iter()
112 .flat_map(|ensemble| ensemble.fragments())
113 .collect_vec();
114
115 let job_ids: Vec<_> = Fragment::find()
116 .select_only()
117 .column(fragment::Column::JobId)
118 .filter(fragment::Column::FragmentId.is_in(related_fragments))
119 .into_tuple()
120 .all(&txn)
121 .await?;
122
123 let job_ids = job_ids.into_iter().collect();
124
125 Ok(job_ids)
126 }
127
128 pub async fn reschedule_inplace(
129 &self,
130 policy: HashMap<JobId, ReschedulePolicy>,
131 ) -> MetaResult<HashMap<DatabaseId, Command>> {
132 let inner = self.metadata_manager.catalog_controller.inner.read().await;
133 let txn = inner.db.begin().await?;
134
135 for (table_id, target) in &policy {
136 let streaming_job = StreamingJob::find_by_id(*table_id)
137 .one(&txn)
138 .await?
139 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
140
141 let max_parallelism = streaming_job.max_parallelism;
142
143 let mut streaming_job = streaming_job.into_active_model();
144
145 match &target {
146 ReschedulePolicy::Parallelism(p) | ReschedulePolicy::Both(p, _) => {
147 if let StreamingParallelism::Fixed(n) = p.parallelism
148 && n > max_parallelism as usize
149 {
150 bail!(format!(
151 "specified parallelism {n} should not exceed max parallelism {max_parallelism}"
152 ));
153 }
154
155 streaming_job.parallelism = Set(p.parallelism.clone());
156 }
157 _ => {}
158 }
159
160 match &target {
161 ReschedulePolicy::ResourceGroup(r) | ReschedulePolicy::Both(_, r) => {
162 streaming_job.specific_resource_group = Set(r.resource_group.clone());
163 }
164 _ => {}
165 }
166
167 StreamingJob::update(streaming_job).exec(&txn).await?;
168 }
169
170 let job_ids: HashSet<JobId> = policy.keys().copied().collect();
171 let commands = build_reschedule_intent_for_jobs(&txn, job_ids).await?;
172
173 txn.commit().await?;
174
175 Ok(commands)
176 }
177
178 pub async fn reschedule_fragment_inplace(
179 &self,
180 policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
181 ) -> MetaResult<HashMap<DatabaseId, Command>> {
182 if policy.is_empty() {
183 return Ok(HashMap::new());
184 }
185
186 let inner = self.metadata_manager.catalog_controller.inner.read().await;
187 let txn = inner.db.begin().await?;
188
189 let fragment_id_list = policy.keys().copied().collect_vec();
190
191 let existing_fragment_ids: HashSet<_> = Fragment::find()
192 .select_only()
193 .column(fragment::Column::FragmentId)
194 .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
195 .into_tuple::<risingwave_meta_model::FragmentId>()
196 .all(&txn)
197 .await?
198 .into_iter()
199 .collect();
200
201 if let Some(missing_fragment_id) = fragment_id_list
202 .iter()
203 .find(|fragment_id| !existing_fragment_ids.contains(*fragment_id))
204 {
205 return Err(MetaError::catalog_id_not_found(
206 "fragment",
207 *missing_fragment_id,
208 ));
209 }
210
211 let mut target_ensembles = vec![];
212
213 for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
214 let entry_fragment_ids = ensemble.entry_fragments().collect_vec();
215
216 let desired_parallelism = match entry_fragment_ids
217 .iter()
218 .filter_map(|fragment_id| policy.get(fragment_id).cloned())
219 .dedup()
220 .collect_vec()
221 .as_slice()
222 {
223 [] => {
224 bail_invalid_parameter!(
225 "none of the entry fragments {:?} were included in the reschedule request; \
226 provide at least one entry fragment id",
227 entry_fragment_ids
228 );
229 }
230 [parallelism] => parallelism.clone(),
231 parallelisms => {
232 bail!(
233 "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
234 parallelisms
235 );
236 }
237 };
238
239 let fragments = Fragment::find()
240 .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
241 .all(&txn)
242 .await?;
243
244 debug_assert!(
245 fragments
246 .iter()
247 .map(|fragment| fragment.parallelism.as_ref())
248 .all_equal(),
249 "entry fragments in the same ensemble should share the same parallelism"
250 );
251
252 let current_parallelism = fragments
253 .first()
254 .and_then(|fragment| fragment.parallelism.clone());
255
256 if current_parallelism == desired_parallelism {
257 continue;
258 }
259
260 for fragment in fragments {
261 let mut fragment = fragment.into_active_model();
262 fragment.parallelism = Set(desired_parallelism.clone());
263 Fragment::update(fragment).exec(&txn).await?;
264 }
265
266 target_ensembles.push(ensemble);
267 }
268
269 if target_ensembles.is_empty() {
270 txn.commit().await?;
271 return Ok(HashMap::new());
272 }
273
274 let target_fragment_ids: HashSet<FragmentId> = target_ensembles
275 .iter()
276 .flat_map(|ensemble| ensemble.component_fragments())
277 .collect();
278 let commands = build_reschedule_intent_for_fragments(&txn, target_fragment_ids).await?;
279
280 txn.commit().await?;
281
282 Ok(commands)
283 }
284
285 async fn rerender(&self, jobs: HashSet<JobId>) -> MetaResult<HashMap<DatabaseId, Command>> {
286 if jobs.is_empty() {
287 return Ok(HashMap::new());
288 }
289
290 let inner = self.metadata_manager.catalog_controller.inner.read().await;
291 let txn = inner.db.begin().await?;
292 let commands = build_reschedule_intent_for_jobs(&txn, jobs).await?;
293 txn.commit().await?;
294 Ok(commands)
295 }
296}
297
298async fn build_reschedule_intent_for_jobs(
299 txn: &impl ConnectionTrait,
300 job_ids: HashSet<JobId>,
301) -> MetaResult<HashMap<DatabaseId, Command>> {
302 if job_ids.is_empty() {
303 return Ok(HashMap::new());
304 }
305
306 let job_id_list = job_ids.iter().copied().collect_vec();
307 let database_jobs: Vec<(DatabaseId, JobId)> = StreamingJob::find()
308 .select_only()
309 .column(object::Column::DatabaseId)
310 .column(streaming_job::Column::JobId)
311 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
312 .filter(streaming_job::Column::JobId.is_in(job_id_list.clone()))
313 .into_tuple()
314 .all(txn)
315 .await?;
316
317 if database_jobs.len() != job_ids.len() {
318 let returned_jobs: HashSet<JobId> =
319 database_jobs.iter().map(|(_, job_id)| *job_id).collect();
320 let missing = job_ids.difference(&returned_jobs).copied().collect_vec();
321 return Err(MetaError::catalog_id_not_found(
322 "streaming job",
323 format!("{missing:?}"),
324 ));
325 }
326
327 let reschedule_context = load_reschedule_context_for_jobs(txn, job_ids).await?;
328 if reschedule_context.is_empty() {
329 return Ok(HashMap::new());
330 }
331
332 let commands = reschedule_context
333 .into_database_contexts()
334 .into_iter()
335 .map(|(database_id, context)| {
336 (
337 database_id,
338 Command::RescheduleIntent {
339 context,
340 reschedule_plan: None,
341 },
342 )
343 })
344 .collect();
345
346 Ok(commands)
347}
348
349async fn build_reschedule_intent_for_fragments(
350 txn: &impl ConnectionTrait,
351 fragment_ids: HashSet<FragmentId>,
352) -> MetaResult<HashMap<DatabaseId, Command>> {
353 if fragment_ids.is_empty() {
354 return Ok(HashMap::new());
355 }
356
357 let fragment_id_list = fragment_ids.iter().copied().collect_vec();
358 let fragment_databases: Vec<(FragmentId, DatabaseId)> = Fragment::find()
359 .select_only()
360 .column(fragment::Column::FragmentId)
361 .column(object::Column::DatabaseId)
362 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
363 .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
364 .into_tuple()
365 .all(txn)
366 .await?;
367
368 if fragment_databases.len() != fragment_ids.len() {
369 let returned: HashSet<FragmentId> = fragment_databases
370 .iter()
371 .map(|(fragment_id, _)| *fragment_id)
372 .collect();
373 let missing = fragment_ids.difference(&returned).copied().collect_vec();
374 return Err(MetaError::catalog_id_not_found(
375 "fragment",
376 format!("{missing:?}"),
377 ));
378 }
379
380 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_id_list).await?;
381 let reschedule_context = load_reschedule_context_for_ensembles(txn, ensembles).await?;
382 if reschedule_context.is_empty() {
383 return Ok(HashMap::new());
384 }
385
386 let commands = reschedule_context
387 .into_database_contexts()
388 .into_iter()
389 .map(|(database_id, context)| {
390 (
391 database_id,
392 Command::RescheduleIntent {
393 context,
394 reschedule_plan: None,
395 },
396 )
397 })
398 .collect();
399
400 Ok(commands)
401}
402
403async fn load_reschedule_context_for_jobs(
404 txn: &impl ConnectionTrait,
405 job_ids: HashSet<JobId>,
406) -> MetaResult<RescheduleContext> {
407 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
408 build_reschedule_context_from_loaded(txn, loaded).await
409}
410
411async fn load_reschedule_context_for_ensembles(
412 txn: &impl ConnectionTrait,
413 ensembles: Vec<NoShuffleEnsemble>,
414) -> MetaResult<RescheduleContext> {
415 let loaded = load_fragment_context(txn, ensembles).await?;
416 build_reschedule_context_from_loaded(txn, loaded).await
417}
418
419async fn build_reschedule_context_from_loaded(
420 txn: &impl ConnectionTrait,
421 loaded: LoadedFragmentContext,
422) -> MetaResult<RescheduleContext> {
423 if loaded.is_empty() {
424 return Ok(RescheduleContext::empty());
425 }
426
427 let job_ids = loaded.job_map.keys().copied().collect_vec();
428 let job_extra_info = get_streaming_job_extra_info(txn, job_ids).await?;
429
430 let fragment_ids = loaded
431 .job_fragments
432 .values()
433 .flat_map(|fragments| fragments.keys().copied())
434 .collect_vec();
435
436 let upstreams: Vec<(FragmentId, FragmentId, DispatcherType)> = FragmentRelation::find()
437 .select_only()
438 .columns([
439 fragment_relation::Column::TargetFragmentId,
440 fragment_relation::Column::SourceFragmentId,
441 fragment_relation::Column::DispatcherType,
442 ])
443 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
444 .into_tuple()
445 .all(txn)
446 .await?;
447
448 let mut upstream_fragments = HashMap::new();
449 for (fragment, upstream, dispatcher) in upstreams {
450 upstream_fragments
451 .entry(fragment as FragmentId)
452 .or_insert(HashMap::new())
453 .insert(upstream as FragmentId, dispatcher);
454 }
455
456 let downstreams = FragmentRelation::find()
457 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids.clone()))
458 .all(txn)
459 .await?;
460
461 let mut downstream_fragments = HashMap::new();
462 let mut downstream_relations = HashMap::new();
463 for relation in downstreams {
464 let source_fragment_id = relation.source_fragment_id as FragmentId;
465 let target_fragment_id = relation.target_fragment_id as FragmentId;
466 downstream_fragments
467 .entry(source_fragment_id)
468 .or_insert(HashMap::new())
469 .insert(target_fragment_id, relation.dispatcher_type);
470 downstream_relations.insert((source_fragment_id, target_fragment_id), relation);
471 }
472
473 Ok(RescheduleContext {
474 loaded,
475 job_extra_info,
476 upstream_fragments,
477 downstream_fragments,
478 downstream_relations,
479 })
480}
481
482fn diff_fragment(
494 prev_fragment_info: &InflightFragmentInfo,
495 curr_actors: &HashMap<ActorId, InflightActorInfo>,
496 upstream_fragments: HashMap<FragmentId, DispatcherType>,
497 downstream_fragments: HashMap<FragmentId, DispatcherType>,
498 all_actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
499 job_extra_info: Option<&StreamingJobExtraInfo>,
500) -> MetaResult<Reschedule> {
501 let prev_ids: HashSet<_> = prev_fragment_info.actors.keys().cloned().collect();
502 let curr_ids: HashSet<_> = curr_actors.keys().cloned().collect();
503
504 let removed_actors: HashSet<_> = &prev_ids - &curr_ids;
505 let added_actor_ids: HashSet<_> = &curr_ids - &prev_ids;
506 let kept_ids: HashSet<_> = prev_ids.intersection(&curr_ids).cloned().collect();
507 debug_assert!(
508 kept_ids.is_empty(),
509 "kept actors found in scale; expected full rebuild, prev={prev_ids:?}, curr={curr_ids:?}, kept={kept_ids:?}"
510 );
511
512 let mut added_actors = HashMap::new();
513 for &actor_id in &added_actor_ids {
514 let InflightActorInfo { worker_id, .. } = curr_actors
515 .get(&actor_id)
516 .ok_or_else(|| anyhow!("BUG: Worker not found for new actor {}", actor_id))?;
517
518 added_actors
519 .entry(*worker_id)
520 .or_insert_with(Vec::new)
521 .push(actor_id);
522 }
523
524 let mut vnode_bitmap_updates = HashMap::new();
525 for actor_id in kept_ids {
526 let prev_actor = &prev_fragment_info.actors[&actor_id];
527 let curr_actor = &curr_actors[&actor_id];
528
529 if prev_actor.vnode_bitmap != curr_actor.vnode_bitmap
531 && let Some(bitmap) = curr_actor.vnode_bitmap.clone()
532 {
533 vnode_bitmap_updates.insert(actor_id, bitmap);
534 }
535 }
536
537 let upstream_dispatcher_mapping =
538 if let DistributionType::Hash = prev_fragment_info.distribution_type {
539 let actor_mapping = curr_actors
540 .iter()
541 .map(
542 |(
543 actor_id,
544 InflightActorInfo {
545 worker_id: _,
546 vnode_bitmap,
547 ..
548 },
549 )| { (*actor_id, vnode_bitmap.clone().unwrap()) },
550 )
551 .collect();
552 Some(ActorMapping::from_bitmaps(&actor_mapping))
553 } else {
554 None
555 };
556
557 let upstream_fragment_dispatcher_ids = upstream_fragments
558 .iter()
559 .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
560 .map(|(upstream_fragment, _)| (*upstream_fragment, prev_fragment_info.fragment_id))
561 .collect();
562
563 let downstream_fragment_ids = downstream_fragments
564 .iter()
565 .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
566 .map(|(fragment_id, _)| *fragment_id)
567 .collect();
568
569 let extra_info = job_extra_info.cloned().unwrap_or_default();
570 let expr_context = extra_info.stream_context().to_expr_context();
571 let job_definition = extra_info.job_definition;
572 let config_override = extra_info.config_override;
573
574 let newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)> =
575 added_actor_ids
576 .iter()
577 .map(|actor_id| {
578 let actor = StreamActor {
579 actor_id: *actor_id,
580 fragment_id: prev_fragment_info.fragment_id,
581 vnode_bitmap: curr_actors[actor_id].vnode_bitmap.clone(),
582 mview_definition: job_definition.clone(),
583 expr_context: Some(expr_context.clone()),
584 config_override: config_override.clone(),
585 };
586 (
587 *actor_id,
588 (
589 (
590 actor,
591 all_actor_dispatchers
592 .get(actor_id)
593 .cloned()
594 .unwrap_or_default(),
595 ),
596 curr_actors[actor_id].worker_id,
597 ),
598 )
599 })
600 .collect();
601
602 let actor_splits = curr_actors
603 .iter()
604 .map(|(&actor_id, info)| (actor_id, info.splits.clone()))
605 .collect();
606
607 let reschedule = Reschedule {
608 added_actors,
609 removed_actors,
610 vnode_bitmap_updates,
611 upstream_fragment_dispatcher_ids,
612 upstream_dispatcher_mapping,
613 downstream_fragment_ids,
614 actor_splits,
615 newly_created_actors,
616 };
617
618 Ok(reschedule)
619}
620
621pub(crate) fn build_reschedule_commands(
622 render_result: FragmentRenderMap,
623 context: RescheduleContext,
624 all_prev_fragments: HashMap<FragmentId, &InflightFragmentInfo>,
625) -> MetaResult<HashMap<DatabaseId, ReschedulePlan>> {
626 if render_result.is_empty() {
627 return Ok(HashMap::new());
628 }
629
630 let RescheduleContext {
631 job_extra_info,
632 upstream_fragments: mut all_upstream_fragments,
633 downstream_fragments: mut all_downstream_fragments,
634 mut downstream_relations,
635 ..
636 } = context;
637
638 let fragment_ids = render_result
639 .values()
640 .flat_map(|jobs| jobs.values())
641 .flatten()
642 .map(|(fragment_id, _)| *fragment_id)
643 .collect_vec();
644
645 let all_related_fragment_ids: HashSet<_> = fragment_ids
646 .iter()
647 .copied()
648 .chain(all_upstream_fragments.values().flatten().map(|(id, _)| *id))
649 .chain(
650 all_downstream_fragments
651 .values()
652 .flatten()
653 .map(|(id, _)| *id),
654 )
655 .collect();
656
657 for fragment_id in all_related_fragment_ids {
658 if !all_prev_fragments.contains_key(&fragment_id) {
659 return Err(MetaError::from(anyhow!(
660 "previous fragment info for {fragment_id} not found"
661 )));
662 }
663 }
664
665 let all_rendered_fragments: HashMap<_, _> = render_result
666 .values()
667 .flat_map(|jobs| jobs.values())
668 .flatten()
669 .map(|(fragment_id, info)| (*fragment_id, info))
670 .collect();
671
672 let mut commands = HashMap::new();
673
674 for (database_id, jobs) in &render_result {
675 let mut all_fragment_actors = HashMap::new();
676 let mut reschedules = HashMap::new();
677
678 for (job_id, fragment_id, fragment_info) in jobs.iter().flat_map(|(job_id, fragments)| {
679 fragments
680 .iter()
681 .map(move |(fragment_id, info)| (job_id, fragment_id, info))
682 }) {
683 let InflightFragmentInfo {
684 distribution_type,
685 actors,
686 ..
687 } = fragment_info;
688
689 let upstream_fragments = all_upstream_fragments
690 .remove(&(*fragment_id as FragmentId))
691 .unwrap_or_default();
692 let downstream_fragments = all_downstream_fragments
693 .remove(&(*fragment_id as FragmentId))
694 .unwrap_or_default();
695
696 let fragment_actors: HashMap<_, _> = upstream_fragments
697 .keys()
698 .copied()
699 .chain(downstream_fragments.keys().copied())
700 .map(|fragment_id| {
701 all_prev_fragments
702 .get(&fragment_id)
703 .map(|fragment| {
704 (
705 fragment_id,
706 fragment.actors.keys().copied().collect::<HashSet<_>>(),
707 )
708 })
709 .ok_or_else(|| {
710 MetaError::from(anyhow!(
711 "fragment {} not found in previous state",
712 fragment_id
713 ))
714 })
715 })
716 .collect::<MetaResult<_>>()?;
717
718 all_fragment_actors.extend(fragment_actors);
719
720 let source_fragment_actors = actors
721 .iter()
722 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
723 .collect();
724
725 let mut all_actor_dispatchers: HashMap<_, Vec<_>> = HashMap::new();
726
727 for downstream_fragment_id in downstream_fragments.keys() {
728 let target_fragment_actors =
729 match all_rendered_fragments.get(downstream_fragment_id) {
730 None => {
731 let external_fragment = all_prev_fragments
732 .get(downstream_fragment_id)
733 .ok_or_else(|| {
734 MetaError::from(anyhow!(
735 "fragment {} not found in previous state",
736 downstream_fragment_id
737 ))
738 })?;
739
740 external_fragment
741 .actors
742 .iter()
743 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
744 .collect()
745 }
746 Some(downstream_rendered) => downstream_rendered
747 .actors
748 .iter()
749 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
750 .collect(),
751 };
752
753 let target_fragment_distribution = *distribution_type;
754
755 let fragment_relation::Model {
756 source_fragment_id: _,
757 target_fragment_id: _,
758 dispatcher_type,
759 dist_key_indices,
760 output_indices,
761 output_type_mapping,
762 } = downstream_relations
763 .remove(&(
764 *fragment_id as FragmentId,
765 *downstream_fragment_id as FragmentId,
766 ))
767 .ok_or_else(|| {
768 MetaError::from(anyhow!(
769 "downstream relation missing for {} -> {}",
770 fragment_id,
771 downstream_fragment_id
772 ))
773 })?;
774
775 let pb_mapping = PbDispatchOutputMapping {
776 indices: output_indices.into_u32_array(),
777 types: output_type_mapping.unwrap_or_default().to_protobuf(),
778 };
779
780 let dispatchers = compose_dispatchers(
781 *distribution_type,
782 &source_fragment_actors,
783 *downstream_fragment_id,
784 target_fragment_distribution,
785 &target_fragment_actors,
786 dispatcher_type,
787 dist_key_indices.into_u32_array(),
788 pb_mapping,
789 );
790
791 for (actor_id, dispatcher) in dispatchers {
792 all_actor_dispatchers
793 .entry(actor_id)
794 .or_default()
795 .push(dispatcher);
796 }
797 }
798
799 let prev_fragment = all_prev_fragments.get(&{ *fragment_id }).ok_or_else(|| {
800 MetaError::from(anyhow!(
801 "fragment {} not found in previous state",
802 fragment_id
803 ))
804 })?;
805
806 let reschedule = diff_fragment(
807 prev_fragment,
808 actors,
809 upstream_fragments,
810 downstream_fragments,
811 all_actor_dispatchers,
812 job_extra_info.get(job_id),
813 )?;
814
815 reschedules.insert(*fragment_id as FragmentId, reschedule);
816 }
817
818 let command = ReschedulePlan {
819 reschedules,
820 fragment_actors: all_fragment_actors,
821 };
822
823 debug_assert!(
824 command
825 .reschedules
826 .values()
827 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
828 "reschedule plan carries vnode_bitmap_updates, expected full rebuild"
829 );
830
831 commands.insert(*database_id, command);
832 }
833
834 Ok(commands)
835}
836
837#[derive(Clone, Debug, Eq, PartialEq)]
838pub struct ParallelismPolicy {
839 pub parallelism: StreamingParallelism,
840}
841
842#[derive(Clone, Debug)]
843pub struct ResourceGroupPolicy {
844 pub resource_group: Option<String>,
845}
846
847#[derive(Clone, Debug)]
848pub enum ReschedulePolicy {
849 Parallelism(ParallelismPolicy),
850 ResourceGroup(ResourceGroupPolicy),
851 Both(ParallelismPolicy, ResourceGroupPolicy),
852}
853
854impl GlobalStreamManager {
855 #[await_tree::instrument("acquire_reschedule_read_guard")]
856 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
857 self.scale_controller.reschedule_lock.read().await
858 }
859
860 #[await_tree::instrument("acquire_reschedule_write_guard")]
861 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
862 self.scale_controller.reschedule_lock.write().await
863 }
864
865 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
874 tracing::info!("trigger parallelism control");
875
876 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
877
878 let background_streaming_jobs = self
879 .metadata_manager
880 .list_background_creating_jobs()
881 .await?;
882
883 let unreschedulable_jobs = self
884 .metadata_manager
885 .collect_unreschedulable_backfill_jobs(&background_streaming_jobs, true)
886 .await?;
887
888 let database_objects: HashMap<risingwave_meta_model::DatabaseId, Vec<JobId>> = self
889 .metadata_manager
890 .catalog_controller
891 .list_streaming_job_with_database()
892 .await?;
893
894 let job_ids = database_objects
895 .iter()
896 .flat_map(|(database_id, job_ids)| {
897 job_ids
898 .iter()
899 .enumerate()
900 .map(move |(idx, job_id)| (idx, database_id, job_id))
901 })
902 .sorted_by(|(idx_a, database_a, _), (idx_b, database_b, _)| {
903 idx_a.cmp(idx_b).then(database_a.cmp(database_b))
904 })
905 .map(|(_, database_id, job_id)| (*database_id, *job_id))
906 .filter(|(_, job_id)| !unreschedulable_jobs.contains(job_id))
907 .collect_vec();
908
909 if job_ids.is_empty() {
910 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
911 return Ok(false);
912 }
913
914 let active_workers =
915 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
916
917 if job_ids.is_empty() {
918 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
919 return Ok(false);
920 }
921
922 tracing::info!(
923 "trigger parallelism control for jobs: {:#?}, workers {:#?}",
924 job_ids,
925 active_workers.current()
926 );
927
928 let batch_size = match self.env.opts.parallelism_control_batch_size {
929 0 => job_ids.len(),
930 n => n,
931 };
932
933 tracing::info!(
934 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
935 job_ids.len(),
936 batch_size,
937 active_workers.current()
938 );
939
940 let batches: Vec<_> = job_ids
941 .into_iter()
942 .chunks(batch_size)
943 .into_iter()
944 .map(|chunk| chunk.collect_vec())
945 .collect();
946
947 for batch in batches {
948 let jobs = batch.iter().map(|(_, job_id)| *job_id).collect();
949
950 let commands = self.scale_controller.rerender(jobs).await?;
951
952 let futures = commands.into_iter().map(|(database_id, command)| {
953 let barrier_scheduler = self.barrier_scheduler.clone();
954 async move { barrier_scheduler.run_command(database_id, command).await }
955 });
956
957 let _results = future::try_join_all(futures).await?;
958 }
959
960 Ok(false)
961 }
962
963 async fn run(&self, mut shutdown_rx: Receiver<()>) {
965 tracing::info!("starting automatic parallelism control monitor");
966
967 let check_period =
968 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
969
970 let mut ticker = tokio::time::interval_at(
971 Instant::now()
972 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
973 check_period,
974 );
975 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
976
977 let (local_notification_tx, mut local_notification_rx) =
978 tokio::sync::mpsc::unbounded_channel();
979
980 self.env
981 .notification_manager()
982 .insert_local_sender(local_notification_tx);
983
984 ticker.tick().await;
986
987 let worker_nodes = self
988 .metadata_manager
989 .list_active_streaming_compute_nodes()
990 .await
991 .expect("list active streaming compute nodes");
992
993 let mut worker_cache: BTreeMap<_, _> = worker_nodes
994 .into_iter()
995 .map(|worker| (worker.id, worker))
996 .collect();
997
998 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
999
1000 let mut should_trigger = false;
1001
1002 loop {
1003 tokio::select! {
1004 biased;
1005
1006 _ = &mut shutdown_rx => {
1007 tracing::info!("Stream manager is stopped");
1008 break;
1009 }
1010
1011 _ = ticker.tick(), if should_trigger => {
1012 let include_workers = worker_cache.keys().copied().collect_vec();
1013
1014 if include_workers.is_empty() {
1015 tracing::debug!("no available worker nodes");
1016 should_trigger = false;
1017 continue;
1018 }
1019
1020 match self.trigger_parallelism_control().await {
1021 Ok(cont) => {
1022 should_trigger = cont;
1023 }
1024 Err(e) => {
1025 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
1026 ticker.reset();
1027 }
1028 }
1029 }
1030
1031 notification = local_notification_rx.recv() => {
1032 let notification = notification.expect("local notification channel closed in loop of stream manager");
1033
1034 let worker_is_streaming_compute = |worker: &WorkerNode| {
1036 worker.get_type() == Ok(WorkerType::ComputeNode)
1037 && worker.property.as_ref().unwrap().is_streaming
1038 };
1039
1040 match notification {
1041 LocalNotification::SystemParamsChange(reader) => {
1042 let new_strategy = reader.adaptive_parallelism_strategy();
1043 if new_strategy != previous_adaptive_parallelism_strategy {
1044 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
1045 should_trigger = true;
1046 previous_adaptive_parallelism_strategy = new_strategy;
1047 }
1048 }
1049 LocalNotification::WorkerNodeActivated(worker) => {
1050 if !worker_is_streaming_compute(&worker) {
1051 continue;
1052 }
1053
1054 tracing::info!(worker = %worker.id, "worker activated notification received");
1055
1056 let prev_worker = worker_cache.insert(worker.id, worker.clone());
1057
1058 match prev_worker {
1059 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
1060 tracing::info!(worker = %worker.id, "worker parallelism changed");
1061 should_trigger = true;
1062 }
1063 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
1064 tracing::info!(worker = %worker.id, "worker label changed");
1065 should_trigger = true;
1066 }
1067 None => {
1068 tracing::info!(worker = %worker.id, "new worker joined");
1069 should_trigger = true;
1070 }
1071 _ => {}
1072 }
1073 }
1074
1075 LocalNotification::WorkerNodeDeleted(worker) => {
1078 if !worker_is_streaming_compute(&worker) {
1079 continue;
1080 }
1081
1082 match worker_cache.remove(&worker.id) {
1083 Some(prev_worker) => {
1084 tracing::info!(worker = %prev_worker.id, "worker removed from stream manager cache");
1085 }
1086 None => {
1087 tracing::warn!(worker = %worker.id, "worker not found in stream manager cache, but it was removed");
1088 }
1089 }
1090 }
1091
1092 LocalNotification::StreamingJobBackfillFinished(job_id) => {
1093 tracing::debug!(job_id = %job_id, "received backfill finished notification");
1094 if let Err(e) = self.apply_post_backfill_parallelism(job_id).await {
1095 tracing::warn!(job_id = %job_id, error = %e.as_report(), "failed to restore parallelism after backfill");
1096 }
1097 }
1098
1099 _ => {}
1100 }
1101 }
1102 }
1103 }
1104 }
1105
1106 async fn apply_post_backfill_parallelism(&self, job_id: JobId) -> MetaResult<()> {
1108 let (target, backfill_parallelism) = self
1111 .metadata_manager
1112 .catalog_controller
1113 .get_job_parallelisms(job_id)
1114 .await?;
1115
1116 match backfill_parallelism {
1118 Some(backfill_parallelism) if backfill_parallelism == target => {
1119 tracing::debug!(
1122 job_id = %job_id,
1123 ?backfill_parallelism,
1124 ?target,
1125 "backfill parallelism equals job parallelism, skip reschedule"
1126 );
1127 return Ok(());
1128 }
1129 Some(_) => {
1130 }
1132 None => {
1133 tracing::debug!(
1136 job_id = %job_id,
1137 ?target,
1138 "no backfill parallelism configured, skip post-backfill reschedule"
1139 );
1140 return Ok(());
1141 }
1142 }
1143
1144 tracing::info!(
1146 job_id = %job_id,
1147 ?target,
1148 ?backfill_parallelism,
1149 "restoring parallelism after backfill via reschedule"
1150 );
1151 let policy = ReschedulePolicy::Parallelism(ParallelismPolicy {
1152 parallelism: target,
1153 });
1154 if let Err(e) = self.reschedule_streaming_job(job_id, policy, false).await {
1155 tracing::warn!(job_id = %job_id, error = %e.as_report(), "reschedule after backfill failed");
1156 return Err(e);
1157 }
1158
1159 tracing::info!(job_id = %job_id, "parallelism reschedule after backfill submitted");
1160 Ok(())
1161 }
1162
1163 pub fn start_auto_parallelism_monitor(
1164 self: Arc<Self>,
1165 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
1166 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
1167 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1168 let join_handle = tokio::spawn(async move {
1169 self.run(shutdown_rx).await;
1170 });
1171
1172 (join_handle, shutdown_tx)
1173 }
1174}