1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::future;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt};
25use risingwave_connector::source::{SplitId, SplitMetaData};
26use risingwave_meta_model::{
27 StreamingParallelism, WorkerId, fragment, fragment_relation, object, streaming_job,
28};
29use risingwave_pb::common::{WorkerNode, WorkerType};
30use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher};
31use sea_orm::{ConnectionTrait, JoinType, QuerySelect, RelationTrait};
32use thiserror_ext::AsReport;
33use tokio::sync::oneshot::Receiver;
34use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
35use tokio::task::JoinHandle;
36use tokio::time::{Instant, MissedTickBehavior};
37
38use crate::barrier::{Command, Reschedule, RescheduleContext, ReschedulePlan};
39use crate::controller::scale::{
40 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble,
41 find_fragment_no_shuffle_dags_detailed, load_fragment_context, load_fragment_context_for_jobs,
42};
43use crate::error::bail_invalid_parameter;
44use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager};
45use crate::model::{ActorId, FragmentId, StreamActor, StreamActorWithDispatchers};
46use crate::stream::{GlobalStreamManager, SourceManagerRef};
47use crate::{MetaError, MetaResult};
48
49#[derive(Debug, Clone, Eq, PartialEq)]
50pub struct WorkerReschedule {
51 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
52}
53
54use risingwave_common::id::JobId;
55use risingwave_meta_model::DispatcherType;
56use risingwave_meta_model::fragment::DistributionType;
57use risingwave_meta_model::prelude::{Fragment, FragmentRelation, StreamingJob};
58use sea_orm::ActiveValue::Set;
59use sea_orm::{
60 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
61};
62
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::utils::{
65 StreamingJobExtraInfo, compose_dispatchers, get_streaming_job_extra_info,
66};
67
68pub type ScaleControllerRef = Arc<ScaleController>;
69
70#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
74struct NormalizedActor {
75 worker_id: WorkerId,
76 vnode_bitmap: Option<Vec<usize>>,
77 splits: Vec<SplitId>,
78}
79
80#[derive(Clone, Debug, PartialEq, Eq)]
81struct NormalizedFragmentLayout {
82 distribution_type: DistributionType,
83 actors: Vec<NormalizedActor>,
84}
85
86fn normalize_actor_info(info: &InflightActorInfo) -> NormalizedActor {
87 let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
88 bitmap
89 .iter_vnodes()
90 .map(|vnode| vnode.to_index())
91 .collect_vec()
92 });
93 let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
94 splits.sort_unstable();
95 NormalizedActor {
96 worker_id: info.worker_id,
97 vnode_bitmap,
98 splits,
99 }
100}
101
102fn build_normalized_fragment_layout(
103 fragment_info: &InflightFragmentInfo,
104) -> NormalizedFragmentLayout {
105 let mut actors = fragment_info
109 .actors
110 .values()
111 .map(normalize_actor_info)
112 .collect_vec();
113 actors.sort_unstable();
114
115 NormalizedFragmentLayout {
116 distribution_type: fragment_info.distribution_type,
117 actors,
118 }
119}
120
121pub(crate) fn rendered_layout_matches_current(
136 render_result: &FragmentRenderMap,
137 all_prev_fragments: &HashMap<FragmentId, &InflightFragmentInfo>,
138) -> MetaResult<bool> {
139 let all_rendered_fragments: HashMap<_, _> = render_result
140 .values()
141 .flat_map(|jobs| jobs.values())
142 .flatten()
143 .map(|(fragment_id, info)| (*fragment_id, info))
144 .collect();
145
146 for (fragment_id, rendered_fragment) in &all_rendered_fragments {
147 let Some(prev_fragment) = all_prev_fragments.get(fragment_id).copied() else {
148 return Err(MetaError::from(anyhow!(
149 "previous fragment info for {} not found",
150 fragment_id
151 )));
152 };
153
154 let rendered_layout = build_normalized_fragment_layout(rendered_fragment);
155 let current_layout = build_normalized_fragment_layout(prev_fragment);
156
157 if rendered_layout != current_layout {
158 return Ok(false);
159 }
160 }
161
162 Ok(true)
163}
164
165pub struct ScaleController {
166 pub metadata_manager: MetadataManager,
167
168 pub source_manager: SourceManagerRef,
169
170 pub env: MetaSrvEnv,
171
172 pub reschedule_lock: RwLock<()>,
175}
176
177impl ScaleController {
178 pub fn new(
179 metadata_manager: &MetadataManager,
180 source_manager: SourceManagerRef,
181 env: MetaSrvEnv,
182 ) -> Self {
183 Self {
184 metadata_manager: metadata_manager.clone(),
185 source_manager,
186 env,
187 reschedule_lock: RwLock::new(()),
188 }
189 }
190
191 pub async fn resolve_related_no_shuffle_jobs(
192 &self,
193 jobs: &[JobId],
194 ) -> MetaResult<HashSet<JobId>> {
195 let inner = self.metadata_manager.catalog_controller.inner.read().await;
196 let txn = inner.db.begin().await?;
197
198 let fragment_ids: Vec<_> = Fragment::find()
199 .select_only()
200 .column(fragment::Column::FragmentId)
201 .filter(fragment::Column::JobId.is_in(jobs.to_vec()))
202 .into_tuple()
203 .all(&txn)
204 .await?;
205 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
206 let related_fragments = ensembles
207 .iter()
208 .flat_map(|ensemble| ensemble.fragments())
209 .collect_vec();
210
211 let job_ids: Vec<_> = Fragment::find()
212 .select_only()
213 .column(fragment::Column::JobId)
214 .filter(fragment::Column::FragmentId.is_in(related_fragments))
215 .into_tuple()
216 .all(&txn)
217 .await?;
218
219 let job_ids = job_ids.into_iter().collect();
220
221 Ok(job_ids)
222 }
223
224 pub async fn reschedule_inplace(
225 &self,
226 policy: HashMap<JobId, ReschedulePolicy>,
227 ) -> MetaResult<HashMap<DatabaseId, Command>> {
228 let inner = self.metadata_manager.catalog_controller.inner.write().await;
229 let txn = inner.db.begin().await?;
230
231 for (table_id, target) in &policy {
232 let streaming_job = StreamingJob::find_by_id(*table_id)
233 .one(&txn)
234 .await?
235 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
236
237 let max_parallelism = streaming_job.max_parallelism;
238
239 let mut streaming_job = streaming_job.into_active_model();
240
241 match &target {
242 ReschedulePolicy::Parallelism(p) | ReschedulePolicy::Both(p, _) => {
243 if let StreamingParallelism::Fixed(n) = p.parallelism
244 && n > max_parallelism as usize
245 {
246 bail!(format!(
247 "specified parallelism {n} should not exceed max parallelism {max_parallelism}"
248 ));
249 }
250
251 streaming_job.parallelism = Set(p.parallelism.clone());
252 if matches!(p.parallelism, StreamingParallelism::Fixed(_)) {
253 streaming_job.adaptive_parallelism_strategy = Set(None);
254 } else {
255 streaming_job.adaptive_parallelism_strategy =
256 Set(p.adaptive_parallelism_strategy.clone());
257 }
258 }
259 _ => {}
260 }
261
262 match &target {
263 ReschedulePolicy::ResourceGroup(r) | ReschedulePolicy::Both(_, r) => {
264 streaming_job.specific_resource_group = Set(r.resource_group.clone());
265 }
266 _ => {}
267 }
268
269 StreamingJob::update(streaming_job).exec(&txn).await?;
270 }
271
272 let job_ids: HashSet<JobId> = policy.keys().copied().collect();
273 let commands = build_reschedule_intent_for_jobs(&txn, job_ids).await?;
274
275 txn.commit().await?;
276
277 Ok(commands)
278 }
279
280 pub async fn reschedule_backfill_parallelism_inplace(
281 &self,
282 policy: HashMap<JobId, Option<ParallelismPolicy>>,
283 ) -> MetaResult<HashMap<DatabaseId, Command>> {
284 if policy.is_empty() {
285 return Ok(HashMap::new());
286 }
287
288 let inner = self.metadata_manager.catalog_controller.inner.write().await;
289 let txn = inner.db.begin().await?;
290
291 for (table_id, policy) in &policy {
292 let streaming_job = StreamingJob::find_by_id(*table_id)
293 .one(&txn)
294 .await?
295 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
296
297 let max_parallelism = streaming_job.max_parallelism;
298
299 let mut streaming_job = streaming_job.into_active_model();
300
301 if let Some(ParallelismPolicy {
302 parallelism: StreamingParallelism::Fixed(n),
303 ..
304 }) = policy
305 && *n > max_parallelism as usize
306 {
307 bail!(format!(
308 "specified backfill parallelism {n} should not exceed max parallelism {max_parallelism}"
309 ));
310 }
311
312 if let Some(policy) = policy {
313 streaming_job.backfill_parallelism = Set(Some(policy.parallelism.clone()));
314 if matches!(policy.parallelism, StreamingParallelism::Fixed(_)) {
315 streaming_job.backfill_adaptive_parallelism_strategy = Set(None);
316 } else {
317 streaming_job.backfill_adaptive_parallelism_strategy =
318 Set(policy.adaptive_parallelism_strategy.clone());
319 }
320 } else {
321 streaming_job.backfill_parallelism = Set(None);
322 streaming_job.backfill_adaptive_parallelism_strategy = Set(None);
323 }
324 streaming_job.update(&txn).await?;
325 }
326
327 let jobs = policy.keys().copied().collect();
328
329 let command = build_reschedule_intent_for_jobs(&txn, jobs).await?;
330
331 txn.commit().await?;
332
333 Ok(command)
334 }
335
336 pub async fn reschedule_fragment_inplace(
337 &self,
338 policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
339 ) -> MetaResult<HashMap<DatabaseId, Command>> {
340 if policy.is_empty() {
341 return Ok(HashMap::new());
342 }
343
344 let inner = self.metadata_manager.catalog_controller.inner.write().await;
345 let txn = inner.db.begin().await?;
346
347 let fragment_id_list = policy.keys().copied().collect_vec();
348
349 let existing_fragment_ids: HashSet<_> = Fragment::find()
350 .select_only()
351 .column(fragment::Column::FragmentId)
352 .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
353 .into_tuple::<risingwave_meta_model::FragmentId>()
354 .all(&txn)
355 .await?
356 .into_iter()
357 .collect();
358
359 if let Some(missing_fragment_id) = fragment_id_list
360 .iter()
361 .find(|fragment_id| !existing_fragment_ids.contains(*fragment_id))
362 {
363 return Err(MetaError::catalog_id_not_found(
364 "fragment",
365 *missing_fragment_id,
366 ));
367 }
368
369 let mut target_ensembles = vec![];
370
371 for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
372 let entry_fragment_ids = ensemble.entry_fragments().collect_vec();
373
374 let desired_parallelism = match entry_fragment_ids
375 .iter()
376 .filter_map(|fragment_id| policy.get(fragment_id).cloned())
377 .dedup()
378 .collect_vec()
379 .as_slice()
380 {
381 [] => {
382 bail_invalid_parameter!(
383 "none of the entry fragments {:?} were included in the reschedule request; \
384 provide at least one entry fragment id",
385 entry_fragment_ids
386 );
387 }
388 [parallelism] => parallelism.clone(),
389 parallelisms => {
390 bail!(
391 "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
392 parallelisms
393 );
394 }
395 };
396
397 let fragments = Fragment::find()
398 .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
399 .all(&txn)
400 .await?;
401
402 debug_assert!(
403 fragments
404 .iter()
405 .map(|fragment| fragment.parallelism.as_ref())
406 .all_equal(),
407 "entry fragments in the same ensemble should share the same parallelism"
408 );
409
410 let current_parallelism = fragments
411 .first()
412 .and_then(|fragment| fragment.parallelism.clone());
413
414 if current_parallelism == desired_parallelism {
415 continue;
416 }
417
418 for fragment in fragments {
419 let mut fragment = fragment.into_active_model();
420 fragment.parallelism = Set(desired_parallelism.clone());
421 Fragment::update(fragment).exec(&txn).await?;
422 }
423
424 target_ensembles.push(ensemble);
425 }
426
427 if target_ensembles.is_empty() {
428 txn.commit().await?;
429 return Ok(HashMap::new());
430 }
431
432 let target_fragment_ids: HashSet<FragmentId> = target_ensembles
433 .iter()
434 .flat_map(|ensemble| ensemble.component_fragments())
435 .collect();
436 let commands = build_reschedule_intent_for_fragments(&txn, target_fragment_ids).await?;
437
438 txn.commit().await?;
439
440 Ok(commands)
441 }
442
443 async fn rerender(&self, jobs: HashSet<JobId>) -> MetaResult<HashMap<DatabaseId, Command>> {
444 if jobs.is_empty() {
445 return Ok(HashMap::new());
446 }
447
448 let inner = self.metadata_manager.catalog_controller.inner.read().await;
449 let txn = inner.db.begin().await?;
450 let commands = build_reschedule_intent_for_jobs(&txn, jobs).await?;
451 txn.commit().await?;
452 Ok(commands)
453 }
454}
455
456async fn build_reschedule_intent_for_jobs(
457 txn: &impl ConnectionTrait,
458 job_ids: HashSet<JobId>,
459) -> MetaResult<HashMap<DatabaseId, Command>> {
460 if job_ids.is_empty() {
461 return Ok(HashMap::new());
462 }
463
464 let job_id_list = job_ids.iter().copied().collect_vec();
465 let database_jobs: Vec<(DatabaseId, JobId)> = StreamingJob::find()
466 .select_only()
467 .column(object::Column::DatabaseId)
468 .column(streaming_job::Column::JobId)
469 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
470 .filter(streaming_job::Column::JobId.is_in(job_id_list.clone()))
471 .into_tuple()
472 .all(txn)
473 .await?;
474
475 if database_jobs.len() != job_ids.len() {
476 let returned_jobs: HashSet<JobId> =
477 database_jobs.iter().map(|(_, job_id)| *job_id).collect();
478 let missing = job_ids.difference(&returned_jobs).copied().collect_vec();
479 return Err(MetaError::catalog_id_not_found(
480 "streaming job",
481 format!("{missing:?}"),
482 ));
483 }
484
485 let reschedule_context = load_reschedule_context_for_jobs(txn, job_ids).await?;
486 if reschedule_context.is_empty() {
487 return Ok(HashMap::new());
488 }
489
490 let commands = reschedule_context
491 .into_database_contexts()
492 .into_iter()
493 .map(|(database_id, context)| {
494 (
495 database_id,
496 Command::RescheduleIntent {
497 context,
498 reschedule_plan: None,
499 },
500 )
501 })
502 .collect();
503
504 Ok(commands)
505}
506
507async fn build_reschedule_intent_for_fragments(
508 txn: &impl ConnectionTrait,
509 fragment_ids: HashSet<FragmentId>,
510) -> MetaResult<HashMap<DatabaseId, Command>> {
511 if fragment_ids.is_empty() {
512 return Ok(HashMap::new());
513 }
514
515 let fragment_id_list = fragment_ids.iter().copied().collect_vec();
516 let fragment_databases: Vec<(FragmentId, DatabaseId)> = Fragment::find()
517 .select_only()
518 .column(fragment::Column::FragmentId)
519 .column(object::Column::DatabaseId)
520 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
521 .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
522 .into_tuple()
523 .all(txn)
524 .await?;
525
526 if fragment_databases.len() != fragment_ids.len() {
527 let returned: HashSet<FragmentId> = fragment_databases
528 .iter()
529 .map(|(fragment_id, _)| *fragment_id)
530 .collect();
531 let missing = fragment_ids.difference(&returned).copied().collect_vec();
532 return Err(MetaError::catalog_id_not_found(
533 "fragment",
534 format!("{missing:?}"),
535 ));
536 }
537
538 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_id_list).await?;
539 let reschedule_context = load_reschedule_context_for_ensembles(txn, ensembles).await?;
540 if reschedule_context.is_empty() {
541 return Ok(HashMap::new());
542 }
543
544 let commands = reschedule_context
545 .into_database_contexts()
546 .into_iter()
547 .map(|(database_id, context)| {
548 (
549 database_id,
550 Command::RescheduleIntent {
551 context,
552 reschedule_plan: None,
553 },
554 )
555 })
556 .collect();
557
558 Ok(commands)
559}
560
561async fn load_reschedule_context_for_jobs(
562 txn: &impl ConnectionTrait,
563 job_ids: HashSet<JobId>,
564) -> MetaResult<RescheduleContext> {
565 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
566 build_reschedule_context_from_loaded(txn, loaded).await
567}
568
569async fn load_reschedule_context_for_ensembles(
570 txn: &impl ConnectionTrait,
571 ensembles: Vec<NoShuffleEnsemble>,
572) -> MetaResult<RescheduleContext> {
573 let loaded = load_fragment_context(txn, ensembles).await?;
574 build_reschedule_context_from_loaded(txn, loaded).await
575}
576
577async fn build_reschedule_context_from_loaded(
578 txn: &impl ConnectionTrait,
579 loaded: LoadedFragmentContext,
580) -> MetaResult<RescheduleContext> {
581 if loaded.is_empty() {
582 return Ok(RescheduleContext::empty());
583 }
584
585 let job_ids = loaded.job_map.keys().copied().collect_vec();
586 let job_extra_info = get_streaming_job_extra_info(txn, job_ids).await?;
587
588 let fragment_ids = loaded
589 .job_fragments
590 .values()
591 .flat_map(|fragments| fragments.keys().copied())
592 .collect_vec();
593
594 let upstreams: Vec<(FragmentId, FragmentId, DispatcherType)> = FragmentRelation::find()
595 .select_only()
596 .columns([
597 fragment_relation::Column::TargetFragmentId,
598 fragment_relation::Column::SourceFragmentId,
599 fragment_relation::Column::DispatcherType,
600 ])
601 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
602 .into_tuple()
603 .all(txn)
604 .await?;
605
606 let mut upstream_fragments = HashMap::new();
607 for (fragment, upstream, dispatcher) in upstreams {
608 upstream_fragments
609 .entry(fragment as FragmentId)
610 .or_insert(HashMap::new())
611 .insert(upstream as FragmentId, dispatcher);
612 }
613
614 let downstreams = FragmentRelation::find()
615 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids.clone()))
616 .all(txn)
617 .await?;
618
619 let mut downstream_fragments = HashMap::new();
620 let mut downstream_relations = HashMap::new();
621 for relation in downstreams {
622 let source_fragment_id = relation.source_fragment_id as FragmentId;
623 let target_fragment_id = relation.target_fragment_id as FragmentId;
624 downstream_fragments
625 .entry(source_fragment_id)
626 .or_insert(HashMap::new())
627 .insert(target_fragment_id, relation.dispatcher_type);
628 downstream_relations.insert((source_fragment_id, target_fragment_id), relation);
629 }
630
631 Ok(RescheduleContext {
632 loaded,
633 job_extra_info,
634 upstream_fragments,
635 downstream_fragments,
636 downstream_relations,
637 })
638}
639
640fn diff_fragment(
652 prev_fragment_info: &InflightFragmentInfo,
653 curr_actors: &HashMap<ActorId, InflightActorInfo>,
654 upstream_fragments: HashMap<FragmentId, DispatcherType>,
655 downstream_fragments: HashMap<FragmentId, DispatcherType>,
656 all_actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
657 job_extra_info: Option<&StreamingJobExtraInfo>,
658) -> MetaResult<Reschedule> {
659 let prev_ids: HashSet<_> = prev_fragment_info.actors.keys().cloned().collect();
660 let curr_ids: HashSet<_> = curr_actors.keys().cloned().collect();
661
662 let removed_actors: HashSet<_> = &prev_ids - &curr_ids;
663 let added_actor_ids: HashSet<_> = &curr_ids - &prev_ids;
664 let kept_ids: HashSet<_> = prev_ids.intersection(&curr_ids).cloned().collect();
665 debug_assert!(
666 kept_ids.is_empty(),
667 "kept actors found in scale; expected full rebuild, prev={prev_ids:?}, curr={curr_ids:?}, kept={kept_ids:?}"
668 );
669
670 let mut added_actors = HashMap::new();
671 for &actor_id in &added_actor_ids {
672 let InflightActorInfo { worker_id, .. } = curr_actors
673 .get(&actor_id)
674 .ok_or_else(|| anyhow!("BUG: Worker not found for new actor {}", actor_id))?;
675
676 added_actors
677 .entry(*worker_id)
678 .or_insert_with(Vec::new)
679 .push(actor_id);
680 }
681
682 let mut vnode_bitmap_updates = HashMap::new();
683 for actor_id in kept_ids {
684 let prev_actor = &prev_fragment_info.actors[&actor_id];
685 let curr_actor = &curr_actors[&actor_id];
686
687 if prev_actor.vnode_bitmap != curr_actor.vnode_bitmap
689 && let Some(bitmap) = curr_actor.vnode_bitmap.clone()
690 {
691 vnode_bitmap_updates.insert(actor_id, bitmap);
692 }
693 }
694
695 let upstream_dispatcher_mapping =
696 if let DistributionType::Hash = prev_fragment_info.distribution_type {
697 let actor_mapping = curr_actors
698 .iter()
699 .map(
700 |(
701 actor_id,
702 InflightActorInfo {
703 worker_id: _,
704 vnode_bitmap,
705 ..
706 },
707 )| { (*actor_id, vnode_bitmap.clone().unwrap()) },
708 )
709 .collect();
710 Some(ActorMapping::from_bitmaps(&actor_mapping))
711 } else {
712 None
713 };
714
715 let upstream_fragment_dispatcher_ids = upstream_fragments
716 .iter()
717 .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
718 .map(|(upstream_fragment, _)| (*upstream_fragment, prev_fragment_info.fragment_id))
719 .collect();
720
721 let downstream_fragment_ids = downstream_fragments
722 .iter()
723 .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
724 .map(|(fragment_id, _)| *fragment_id)
725 .collect();
726
727 let extra_info = job_extra_info.cloned().unwrap_or_default();
728 let expr_context = extra_info.stream_context().to_expr_context();
729 let job_definition = extra_info.job_definition;
730 let config_override = extra_info.config_override;
731
732 let newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)> =
733 added_actor_ids
734 .iter()
735 .map(|actor_id| {
736 let actor = StreamActor {
737 actor_id: *actor_id,
738 fragment_id: prev_fragment_info.fragment_id,
739 vnode_bitmap: curr_actors[actor_id].vnode_bitmap.clone(),
740 mview_definition: job_definition.clone(),
741 expr_context: Some(expr_context.clone()),
742 config_override: config_override.clone(),
743 };
744 (
745 *actor_id,
746 (
747 (
748 actor,
749 all_actor_dispatchers
750 .get(actor_id)
751 .cloned()
752 .unwrap_or_default(),
753 ),
754 curr_actors[actor_id].worker_id,
755 ),
756 )
757 })
758 .collect();
759
760 let actor_splits = curr_actors
761 .iter()
762 .map(|(&actor_id, info)| (actor_id, info.splits.clone()))
763 .collect();
764
765 let reschedule = Reschedule {
766 added_actors,
767 removed_actors,
768 vnode_bitmap_updates,
769 upstream_fragment_dispatcher_ids,
770 upstream_dispatcher_mapping,
771 downstream_fragment_ids,
772 actor_splits,
773 newly_created_actors,
774 };
775
776 Ok(reschedule)
777}
778
779pub(crate) fn build_reschedule_commands(
784 render_result: FragmentRenderMap,
785 context: RescheduleContext,
786 all_prev_fragments: HashMap<FragmentId, &InflightFragmentInfo>,
787) -> MetaResult<HashMap<DatabaseId, ReschedulePlan>> {
788 if render_result.is_empty() {
789 return Ok(HashMap::new());
790 }
791
792 let RescheduleContext {
793 job_extra_info,
794 upstream_fragments: mut all_upstream_fragments,
795 downstream_fragments: mut all_downstream_fragments,
796 mut downstream_relations,
797 ..
798 } = context;
799
800 let fragment_ids = render_result
801 .values()
802 .flat_map(|jobs| jobs.values())
803 .flatten()
804 .map(|(fragment_id, _)| *fragment_id)
805 .collect_vec();
806
807 let all_related_fragment_ids: HashSet<_> = fragment_ids
808 .iter()
809 .copied()
810 .chain(all_upstream_fragments.values().flatten().map(|(id, _)| *id))
811 .chain(
812 all_downstream_fragments
813 .values()
814 .flatten()
815 .map(|(id, _)| *id),
816 )
817 .collect();
818
819 for fragment_id in all_related_fragment_ids {
820 if !all_prev_fragments.contains_key(&fragment_id) {
821 return Err(MetaError::from(anyhow!(
822 "previous fragment info for {fragment_id} not found"
823 )));
824 }
825 }
826
827 let all_rendered_fragments: HashMap<_, _> = render_result
828 .values()
829 .flat_map(|jobs| jobs.values())
830 .flatten()
831 .map(|(fragment_id, info)| (*fragment_id, info))
832 .collect();
833
834 let mut commands = HashMap::new();
835
836 for (database_id, jobs) in &render_result {
837 let mut all_fragment_actors = HashMap::new();
838 let mut reschedules = HashMap::new();
839
840 for (job_id, fragment_id, fragment_info) in jobs.iter().flat_map(|(job_id, fragments)| {
841 fragments
842 .iter()
843 .map(move |(fragment_id, info)| (job_id, fragment_id, info))
844 }) {
845 let InflightFragmentInfo {
846 distribution_type,
847 actors,
848 ..
849 } = fragment_info;
850
851 let upstream_fragments = all_upstream_fragments
852 .remove(&(*fragment_id as FragmentId))
853 .unwrap_or_default();
854 let downstream_fragments = all_downstream_fragments
855 .remove(&(*fragment_id as FragmentId))
856 .unwrap_or_default();
857
858 let fragment_actors: HashMap<_, _> = upstream_fragments
859 .keys()
860 .copied()
861 .chain(downstream_fragments.keys().copied())
862 .map(|fragment_id| {
863 all_prev_fragments
864 .get(&fragment_id)
865 .map(|fragment| {
866 (
867 fragment_id,
868 fragment.actors.keys().copied().collect::<HashSet<_>>(),
869 )
870 })
871 .ok_or_else(|| {
872 MetaError::from(anyhow!(
873 "fragment {} not found in previous state",
874 fragment_id
875 ))
876 })
877 })
878 .collect::<MetaResult<_>>()?;
879
880 all_fragment_actors.extend(fragment_actors);
881
882 let source_fragment_actors = actors
883 .iter()
884 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
885 .collect();
886
887 let mut all_actor_dispatchers: HashMap<_, Vec<_>> = HashMap::new();
888
889 for downstream_fragment_id in downstream_fragments.keys() {
890 let target_fragment_actors =
891 match all_rendered_fragments.get(downstream_fragment_id) {
892 None => {
893 let external_fragment = all_prev_fragments
894 .get(downstream_fragment_id)
895 .ok_or_else(|| {
896 MetaError::from(anyhow!(
897 "fragment {} not found in previous state",
898 downstream_fragment_id
899 ))
900 })?;
901
902 external_fragment
903 .actors
904 .iter()
905 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
906 .collect()
907 }
908 Some(downstream_rendered) => downstream_rendered
909 .actors
910 .iter()
911 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
912 .collect(),
913 };
914
915 let target_fragment_distribution = *distribution_type;
916
917 let fragment_relation::Model {
918 source_fragment_id: _,
919 target_fragment_id: _,
920 dispatcher_type,
921 dist_key_indices,
922 output_indices,
923 output_type_mapping,
924 } = downstream_relations
925 .remove(&(
926 *fragment_id as FragmentId,
927 *downstream_fragment_id as FragmentId,
928 ))
929 .ok_or_else(|| {
930 MetaError::from(anyhow!(
931 "downstream relation missing for {} -> {}",
932 fragment_id,
933 downstream_fragment_id
934 ))
935 })?;
936
937 let pb_mapping = PbDispatchOutputMapping {
938 indices: output_indices.into_u32_array(),
939 types: output_type_mapping.unwrap_or_default().to_protobuf(),
940 };
941
942 let (dispatchers, _) = compose_dispatchers(
943 *distribution_type,
944 &source_fragment_actors,
945 *downstream_fragment_id,
946 target_fragment_distribution,
947 &target_fragment_actors,
948 dispatcher_type,
949 dist_key_indices.into_u32_array(),
950 pb_mapping,
951 );
952
953 for (actor_id, dispatcher) in dispatchers {
954 all_actor_dispatchers
955 .entry(actor_id)
956 .or_default()
957 .push(dispatcher);
958 }
959 }
960
961 let prev_fragment = all_prev_fragments.get(&{ *fragment_id }).ok_or_else(|| {
962 MetaError::from(anyhow!(
963 "fragment {} not found in previous state",
964 fragment_id
965 ))
966 })?;
967
968 let reschedule = diff_fragment(
969 prev_fragment,
970 actors,
971 upstream_fragments,
972 downstream_fragments,
973 all_actor_dispatchers,
974 job_extra_info.get(job_id),
975 )?;
976
977 reschedules.insert(*fragment_id as FragmentId, reschedule);
978 }
979
980 let command = ReschedulePlan {
981 reschedules,
982 fragment_actors: all_fragment_actors,
983 };
984
985 debug_assert!(
986 command
987 .reschedules
988 .values()
989 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
990 "reschedule plan carries vnode_bitmap_updates, expected full rebuild"
991 );
992
993 commands.insert(*database_id, command);
994 }
995
996 Ok(commands)
997}
998
999#[cfg(test)]
1000mod tests {
1001 use std::collections::{HashMap, HashSet};
1002
1003 use risingwave_common::bitmap::Bitmap;
1004 use risingwave_common::catalog::FragmentTypeMask;
1005 use risingwave_connector::source::SplitImpl;
1006 use risingwave_connector::source::test_source::TestSourceSplit;
1007
1008 use super::*;
1009
1010 fn actor(worker_id: impl Into<WorkerId>, vnode_bitmap: Option<Bitmap>) -> InflightActorInfo {
1011 InflightActorInfo {
1012 worker_id: worker_id.into(),
1013 vnode_bitmap,
1014 splits: vec![],
1015 }
1016 }
1017
1018 fn actor_with_splits(
1019 worker_id: impl Into<WorkerId>,
1020 vnode_bitmap: Option<Bitmap>,
1021 splits: Vec<SplitImpl>,
1022 ) -> InflightActorInfo {
1023 InflightActorInfo {
1024 worker_id: worker_id.into(),
1025 vnode_bitmap,
1026 splits,
1027 }
1028 }
1029
1030 fn test_split(id: &str, offset: &str) -> SplitImpl {
1031 SplitImpl::Test(TestSourceSplit {
1032 id: id.into(),
1033 properties: HashMap::new(),
1034 offset: offset.to_owned(),
1035 })
1036 }
1037
1038 fn fragment(
1039 fragment_id: FragmentId,
1040 actors: impl IntoIterator<Item = (ActorId, InflightActorInfo)>,
1041 ) -> InflightFragmentInfo {
1042 InflightFragmentInfo {
1043 fragment_id,
1044 distribution_type: DistributionType::Hash,
1045 fragment_type_mask: FragmentTypeMask::empty(),
1046 vnode_count: 8,
1047 nodes: Default::default(),
1048 actors: actors.into_iter().collect(),
1049 state_table_ids: HashSet::new(),
1050 }
1051 }
1052
1053 fn render_result(fragments: Vec<(FragmentId, InflightFragmentInfo)>) -> FragmentRenderMap {
1054 HashMap::from([(
1055 DatabaseId::new(1),
1056 HashMap::from([(
1057 JobId::new(1),
1058 fragments.into_iter().collect::<HashMap<_, _>>(),
1059 )]),
1060 )])
1061 }
1062
1063 #[test]
1064 fn rendered_layout_matches_current_ignores_actor_ids_across_fragments() {
1065 let current_source = fragment(
1066 FragmentId::new(1),
1067 [
1068 (
1069 ActorId::new(101),
1070 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1071 ),
1072 (
1073 ActorId::new(102),
1074 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1075 ),
1076 ],
1077 );
1078 let current_target = fragment(
1079 FragmentId::new(2),
1080 [
1081 (
1082 ActorId::new(201),
1083 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1084 ),
1085 (
1086 ActorId::new(202),
1087 actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1088 ),
1089 ],
1090 );
1091
1092 let rendered = render_result(vec![
1093 (
1094 FragmentId::new(1),
1095 fragment(
1096 FragmentId::new(1),
1097 [
1098 (
1099 ActorId::new(1001),
1100 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1101 ),
1102 (
1103 ActorId::new(1002),
1104 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1105 ),
1106 ],
1107 ),
1108 ),
1109 (
1110 FragmentId::new(2),
1111 fragment(
1112 FragmentId::new(2),
1113 [
1114 (
1115 ActorId::new(2001),
1116 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1117 ),
1118 (
1119 ActorId::new(2002),
1120 actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1121 ),
1122 ],
1123 ),
1124 ),
1125 ]);
1126
1127 let prev = HashMap::from([
1128 (FragmentId::new(1), ¤t_source),
1129 (FragmentId::new(2), ¤t_target),
1130 ]);
1131
1132 assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1133 }
1134
1135 #[test]
1136 fn rendered_layout_matches_current_detects_target_fragment_layout_changes() {
1137 let current_source = fragment(
1138 FragmentId::new(1),
1139 [
1140 (
1141 ActorId::new(101),
1142 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1143 ),
1144 (
1145 ActorId::new(102),
1146 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1147 ),
1148 ],
1149 );
1150 let current_target = fragment(
1151 FragmentId::new(2),
1152 [
1153 (
1154 ActorId::new(201),
1155 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1156 ),
1157 (
1158 ActorId::new(202),
1159 actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1160 ),
1161 ],
1162 );
1163
1164 let rendered = render_result(vec![
1165 (
1166 FragmentId::new(1),
1167 fragment(
1168 FragmentId::new(1),
1169 [
1170 (
1171 ActorId::new(1001),
1172 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1173 ),
1174 (
1175 ActorId::new(1002),
1176 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1177 ),
1178 ],
1179 ),
1180 ),
1181 (
1182 FragmentId::new(2),
1183 fragment(
1184 FragmentId::new(2),
1185 [
1186 (
1187 ActorId::new(2001),
1188 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3, 4, 5]))),
1189 ),
1190 (
1191 ActorId::new(2002),
1192 actor(4, Some(Bitmap::from_indices(8, &[6, 7]))),
1193 ),
1194 ],
1195 ),
1196 ),
1197 ]);
1198
1199 let prev = HashMap::from([
1200 (FragmentId::new(1), ¤t_source),
1201 (FragmentId::new(2), ¤t_target),
1202 ]);
1203
1204 assert!(!rendered_layout_matches_current(&rendered, &prev,).unwrap());
1205 }
1206
1207 #[test]
1208 fn rendered_layout_matches_current_ignores_split_offsets() {
1209 let fragment_id = FragmentId::new(1);
1210 let prev_fragment = fragment(
1211 fragment_id,
1212 [(
1213 ActorId::new(101),
1214 actor_with_splits(
1215 1,
1216 Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1217 vec![test_split("split-0", "100")],
1218 ),
1219 )],
1220 );
1221 let rendered = render_result(vec![(
1222 fragment_id,
1223 fragment(
1224 fragment_id,
1225 [(
1226 ActorId::new(1001),
1227 actor_with_splits(
1228 1,
1229 Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1230 vec![test_split("split-0", "200")],
1231 ),
1232 )],
1233 ),
1234 )]);
1235 let prev = HashMap::from([(fragment_id, &prev_fragment)]);
1236
1237 assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1238 }
1239}
1240
1241#[derive(Clone, Debug, Eq, PartialEq)]
1242pub struct ParallelismPolicy {
1243 pub parallelism: StreamingParallelism,
1244 pub adaptive_parallelism_strategy: Option<String>,
1245}
1246
1247#[derive(Clone, Debug)]
1248pub struct ResourceGroupPolicy {
1249 pub resource_group: Option<String>,
1250}
1251
1252#[derive(Clone, Debug)]
1253pub enum ReschedulePolicy {
1254 Parallelism(ParallelismPolicy),
1255 ResourceGroup(ResourceGroupPolicy),
1256 Both(ParallelismPolicy, ResourceGroupPolicy),
1257}
1258
1259impl GlobalStreamManager {
1260 #[await_tree::instrument("acquire_reschedule_read_guard")]
1261 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
1262 self.scale_controller.reschedule_lock.read().await
1263 }
1264
1265 #[await_tree::instrument("acquire_reschedule_write_guard")]
1266 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
1267 self.scale_controller.reschedule_lock.write().await
1268 }
1269
1270 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
1279 tracing::info!("trigger parallelism control");
1280
1281 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
1282
1283 let background_streaming_jobs = self
1284 .metadata_manager
1285 .list_background_creating_jobs()
1286 .await?;
1287
1288 let blocked_jobs = self
1289 .metadata_manager
1290 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_streaming_jobs, true)
1291 .await?;
1292 let has_blocked_jobs = !blocked_jobs.is_empty();
1293
1294 let database_objects: HashMap<risingwave_meta_model::DatabaseId, Vec<JobId>> = self
1295 .metadata_manager
1296 .catalog_controller
1297 .list_streaming_job_with_database()
1298 .await?;
1299
1300 let job_ids = database_objects
1301 .iter()
1302 .flat_map(|(database_id, job_ids)| {
1303 job_ids
1304 .iter()
1305 .enumerate()
1306 .map(move |(idx, job_id)| (idx, database_id, job_id))
1307 })
1308 .sorted_by(|(idx_a, database_a, _), (idx_b, database_b, _)| {
1309 idx_a.cmp(idx_b).then(database_a.cmp(database_b))
1310 })
1311 .map(|(_, database_id, job_id)| (*database_id, *job_id))
1312 .filter(|(_, job_id)| !blocked_jobs.contains(job_id))
1313 .collect_vec();
1314
1315 if job_ids.is_empty() {
1316 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
1317 return Ok(has_blocked_jobs);
1321 }
1322
1323 let active_workers =
1324 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
1325
1326 tracing::info!(
1327 "trigger parallelism control for jobs: {:#?}, workers {:#?}",
1328 job_ids,
1329 active_workers.current()
1330 );
1331
1332 let batch_size = match self.env.opts.parallelism_control_batch_size {
1333 0 => job_ids.len(),
1334 n => n,
1335 };
1336
1337 tracing::info!(
1338 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
1339 job_ids.len(),
1340 batch_size,
1341 active_workers.current()
1342 );
1343
1344 let batches: Vec<_> = job_ids
1345 .into_iter()
1346 .chunks(batch_size)
1347 .into_iter()
1348 .map(|chunk| chunk.collect_vec())
1349 .collect();
1350
1351 for batch in batches {
1352 let jobs = batch.iter().map(|(_, job_id)| *job_id).collect();
1353
1354 let commands = self.scale_controller.rerender(jobs).await?;
1355
1356 let futures = commands.into_iter().map(|(database_id, command)| {
1357 let barrier_scheduler = self.barrier_scheduler.clone();
1358 async move { barrier_scheduler.run_command(database_id, command).await }
1359 });
1360
1361 let _results = future::try_join_all(futures).await?;
1362 }
1363
1364 Ok(has_blocked_jobs)
1365 }
1366
1367 async fn run(&self, mut shutdown_rx: Receiver<()>) {
1369 tracing::info!("starting automatic parallelism control monitor");
1370
1371 let check_period =
1372 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
1373
1374 let mut ticker = tokio::time::interval_at(
1375 Instant::now()
1376 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
1377 check_period,
1378 );
1379 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1380
1381 let (local_notification_tx, mut local_notification_rx) =
1382 tokio::sync::mpsc::unbounded_channel();
1383
1384 self.env
1385 .notification_manager()
1386 .insert_local_sender(local_notification_tx);
1387
1388 ticker.tick().await;
1390
1391 let worker_nodes = self
1392 .metadata_manager
1393 .list_active_streaming_compute_nodes()
1394 .await
1395 .expect("list active streaming compute nodes");
1396
1397 let mut worker_cache: BTreeMap<_, _> = worker_nodes
1398 .into_iter()
1399 .map(|worker| (worker.id, worker))
1400 .collect();
1401
1402 let mut should_trigger = false;
1403
1404 loop {
1405 tokio::select! {
1406 biased;
1407
1408 _ = &mut shutdown_rx => {
1409 tracing::info!("Stream manager is stopped");
1410 break;
1411 }
1412
1413 _ = ticker.tick(), if should_trigger => {
1414 let include_workers = worker_cache.keys().copied().collect_vec();
1415
1416 if include_workers.is_empty() {
1417 tracing::debug!("no available worker nodes");
1418 should_trigger = false;
1419 continue;
1420 }
1421
1422 match self.trigger_parallelism_control().await {
1423 Ok(cont) => {
1424 should_trigger = cont;
1425 }
1426 Err(e) => {
1427 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
1428 ticker.reset();
1429 }
1430 }
1431 }
1432
1433 notification = local_notification_rx.recv() => {
1434 let notification = notification.expect("local notification channel closed in loop of stream manager");
1435
1436 let worker_is_streaming_compute = |worker: &WorkerNode| {
1438 worker.get_type() == Ok(WorkerType::ComputeNode)
1439 && worker.property.as_ref().unwrap().is_streaming
1440 };
1441
1442 match notification {
1443 LocalNotification::SystemParamsChange(_) => {}
1447 LocalNotification::WorkerNodeActivated(worker) => {
1448 if !worker_is_streaming_compute(&worker) {
1449 continue;
1450 }
1451
1452 tracing::info!(worker = %worker.id, "worker activated notification received");
1453
1454 let prev_worker = worker_cache.insert(worker.id, worker.clone());
1455
1456 match prev_worker {
1457 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
1458 tracing::info!(worker = %worker.id, "worker parallelism changed");
1459 should_trigger = true;
1460 }
1461 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
1462 tracing::info!(worker = %worker.id, "worker label changed");
1463 should_trigger = true;
1464 }
1465 None => {
1466 tracing::info!(worker = %worker.id, "new worker joined");
1467 should_trigger = true;
1468 }
1469 _ => {}
1470 }
1471 }
1472
1473 LocalNotification::WorkerNodeDeleted(worker) => {
1476 if !worker_is_streaming_compute(&worker) {
1477 continue;
1478 }
1479
1480 match worker_cache.remove(&worker.id) {
1481 Some(prev_worker) => {
1482 tracing::info!(worker = %prev_worker.id, "worker removed from stream manager cache");
1483 }
1484 None => {
1485 tracing::warn!(worker = %worker.id, "worker not found in stream manager cache, but it was removed");
1486 }
1487 }
1488 }
1489
1490 LocalNotification::StreamingJobBackfillFinished(job_id) => {
1491 tracing::debug!(job_id = %job_id, "received backfill finished notification");
1492 if let Err(e) = self.apply_post_backfill_parallelism(job_id).await {
1493 tracing::warn!(job_id = %job_id, error = %e.as_report(), "failed to restore parallelism after backfill");
1494 should_trigger = true;
1497 }
1498 }
1499
1500 _ => {}
1501 }
1502 }
1503 }
1504 }
1505 }
1506
1507 async fn apply_post_backfill_parallelism(&self, job_id: JobId) -> MetaResult<()> {
1509 let Some((
1512 target,
1513 target_adaptive_parallelism_strategy,
1514 backfill_parallelism,
1515 backfill_adaptive_parallelism_strategy,
1516 )) = self
1517 .metadata_manager
1518 .catalog_controller
1519 .get_job_parallelisms(job_id)
1520 .await?
1521 else {
1522 tracing::debug!(
1525 job_id = %job_id,
1526 "streaming job not found when applying post-backfill parallelism, skip"
1527 );
1528 return Ok(());
1529 };
1530
1531 match backfill_parallelism {
1533 Some(backfill_parallelism)
1534 if backfill_parallelism == target
1535 && backfill_adaptive_parallelism_strategy
1536 == target_adaptive_parallelism_strategy =>
1537 {
1538 tracing::debug!(
1541 job_id = %job_id,
1542 ?backfill_parallelism,
1543 ?target,
1544 "backfill parallelism equals job parallelism, skip reschedule"
1545 );
1546 return Ok(());
1547 }
1548 Some(_) => {
1549 }
1551 None => {
1552 tracing::debug!(
1555 job_id = %job_id,
1556 ?target,
1557 "no backfill parallelism configured, skip post-backfill reschedule"
1558 );
1559 return Ok(());
1560 }
1561 }
1562
1563 tracing::info!(
1565 job_id = %job_id,
1566 ?target,
1567 ?backfill_parallelism,
1568 "restoring parallelism after backfill via reschedule"
1569 );
1570 let policy = ReschedulePolicy::Parallelism(ParallelismPolicy {
1571 parallelism: target,
1572 adaptive_parallelism_strategy: target_adaptive_parallelism_strategy,
1573 });
1574 if let Err(e) = self.reschedule_streaming_job(job_id, policy, false).await {
1575 tracing::warn!(job_id = %job_id, error = %e.as_report(), "reschedule after backfill failed");
1576 return Err(e);
1577 }
1578
1579 tracing::info!(job_id = %job_id, "parallelism reschedule after backfill submitted");
1580 Ok(())
1581 }
1582
1583 pub fn start_auto_parallelism_monitor(
1584 self: Arc<Self>,
1585 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
1586 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
1587 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1588 let join_handle = tokio::spawn(async move {
1589 self.run(shutdown_rx).await;
1590 });
1591
1592 (join_handle, shutdown_tx)
1593 }
1594}