1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use futures::future;
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::DatabaseId;
24use risingwave_common::hash::{ActorMapping, VnodeBitmapExt};
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_connector::source::{SplitId, SplitMetaData};
27use risingwave_meta_model::{
28 StreamingParallelism, WorkerId, fragment, fragment_relation, object, streaming_job,
29};
30use risingwave_pb::common::{WorkerNode, WorkerType};
31use risingwave_pb::stream_plan::{PbDispatchOutputMapping, PbDispatcher};
32use sea_orm::{ConnectionTrait, JoinType, QuerySelect, RelationTrait};
33use thiserror_ext::AsReport;
34use tokio::sync::oneshot::Receiver;
35use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
36use tokio::task::JoinHandle;
37use tokio::time::{Instant, MissedTickBehavior};
38
39use crate::barrier::{Command, Reschedule, RescheduleContext, ReschedulePlan};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragmentContext, NoShuffleEnsemble,
42 find_fragment_no_shuffle_dags_detailed, load_fragment_context, load_fragment_context_for_jobs,
43};
44use crate::error::bail_invalid_parameter;
45use crate::manager::{ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager};
46use crate::model::{ActorId, FragmentId, StreamActor, StreamActorWithDispatchers};
47use crate::stream::{GlobalStreamManager, SourceManagerRef};
48use crate::{MetaError, MetaResult};
49
50#[derive(Debug, Clone, Eq, PartialEq)]
51pub struct WorkerReschedule {
52 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
53}
54
55use risingwave_common::id::JobId;
56use risingwave_common::system_param::AdaptiveParallelismStrategy;
57use risingwave_meta_model::DispatcherType;
58use risingwave_meta_model::fragment::DistributionType;
59use risingwave_meta_model::prelude::{Fragment, FragmentRelation, StreamingJob};
60use sea_orm::ActiveValue::Set;
61use sea_orm::{
62 ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait,
63};
64
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::controller::utils::{
67 StreamingJobExtraInfo, compose_dispatchers, get_streaming_job_extra_info,
68};
69
70pub type ScaleControllerRef = Arc<ScaleController>;
71
72#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
76struct NormalizedActor {
77 worker_id: WorkerId,
78 vnode_bitmap: Option<Vec<usize>>,
79 splits: Vec<SplitId>,
80}
81
82#[derive(Clone, Debug, PartialEq, Eq)]
83struct NormalizedFragmentLayout {
84 distribution_type: DistributionType,
85 actors: Vec<NormalizedActor>,
86}
87
88fn normalize_actor_info(info: &InflightActorInfo) -> NormalizedActor {
89 let vnode_bitmap = info.vnode_bitmap.as_ref().map(|bitmap| {
90 bitmap
91 .iter_vnodes()
92 .map(|vnode| vnode.to_index())
93 .collect_vec()
94 });
95 let mut splits = info.splits.iter().map(SplitMetaData::id).collect_vec();
96 splits.sort_unstable();
97 NormalizedActor {
98 worker_id: info.worker_id,
99 vnode_bitmap,
100 splits,
101 }
102}
103
104fn build_normalized_fragment_layout(
105 fragment_info: &InflightFragmentInfo,
106) -> NormalizedFragmentLayout {
107 let mut actors = fragment_info
111 .actors
112 .values()
113 .map(normalize_actor_info)
114 .collect_vec();
115 actors.sort_unstable();
116
117 NormalizedFragmentLayout {
118 distribution_type: fragment_info.distribution_type,
119 actors,
120 }
121}
122
123pub(crate) fn rendered_layout_matches_current(
138 render_result: &FragmentRenderMap,
139 all_prev_fragments: &HashMap<FragmentId, &InflightFragmentInfo>,
140) -> MetaResult<bool> {
141 let all_rendered_fragments: HashMap<_, _> = render_result
142 .values()
143 .flat_map(|jobs| jobs.values())
144 .flatten()
145 .map(|(fragment_id, info)| (*fragment_id, info))
146 .collect();
147
148 for (fragment_id, rendered_fragment) in &all_rendered_fragments {
149 let Some(prev_fragment) = all_prev_fragments.get(fragment_id).copied() else {
150 return Err(MetaError::from(anyhow!(
151 "previous fragment info for {} not found",
152 fragment_id
153 )));
154 };
155
156 let rendered_layout = build_normalized_fragment_layout(rendered_fragment);
157 let current_layout = build_normalized_fragment_layout(prev_fragment);
158
159 if rendered_layout != current_layout {
160 return Ok(false);
161 }
162 }
163
164 Ok(true)
165}
166
167pub struct ScaleController {
168 pub metadata_manager: MetadataManager,
169
170 pub source_manager: SourceManagerRef,
171
172 pub env: MetaSrvEnv,
173
174 pub reschedule_lock: RwLock<()>,
177}
178
179impl ScaleController {
180 pub fn new(
181 metadata_manager: &MetadataManager,
182 source_manager: SourceManagerRef,
183 env: MetaSrvEnv,
184 ) -> Self {
185 Self {
186 metadata_manager: metadata_manager.clone(),
187 source_manager,
188 env,
189 reschedule_lock: RwLock::new(()),
190 }
191 }
192
193 pub async fn resolve_related_no_shuffle_jobs(
194 &self,
195 jobs: &[JobId],
196 ) -> MetaResult<HashSet<JobId>> {
197 let inner = self.metadata_manager.catalog_controller.inner.read().await;
198 let txn = inner.db.begin().await?;
199
200 let fragment_ids: Vec<_> = Fragment::find()
201 .select_only()
202 .column(fragment::Column::FragmentId)
203 .filter(fragment::Column::JobId.is_in(jobs.to_vec()))
204 .into_tuple()
205 .all(&txn)
206 .await?;
207 let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?;
208 let related_fragments = ensembles
209 .iter()
210 .flat_map(|ensemble| ensemble.fragments())
211 .collect_vec();
212
213 let job_ids: Vec<_> = Fragment::find()
214 .select_only()
215 .column(fragment::Column::JobId)
216 .filter(fragment::Column::FragmentId.is_in(related_fragments))
217 .into_tuple()
218 .all(&txn)
219 .await?;
220
221 let job_ids = job_ids.into_iter().collect();
222
223 Ok(job_ids)
224 }
225
226 pub async fn reschedule_inplace(
227 &self,
228 policy: HashMap<JobId, ReschedulePolicy>,
229 ) -> MetaResult<HashMap<DatabaseId, Command>> {
230 let inner = self.metadata_manager.catalog_controller.inner.write().await;
231 let txn = inner.db.begin().await?;
232
233 for (table_id, target) in &policy {
234 let streaming_job = StreamingJob::find_by_id(*table_id)
235 .one(&txn)
236 .await?
237 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
238
239 let max_parallelism = streaming_job.max_parallelism;
240
241 let mut streaming_job = streaming_job.into_active_model();
242
243 match &target {
244 ReschedulePolicy::Parallelism(p) | ReschedulePolicy::Both(p, _) => {
245 if let StreamingParallelism::Fixed(n) = p.parallelism
246 && n > max_parallelism as usize
247 {
248 bail!(format!(
249 "specified parallelism {n} should not exceed max parallelism {max_parallelism}"
250 ));
251 }
252
253 streaming_job.parallelism = Set(p.parallelism.clone());
254 }
255 _ => {}
256 }
257
258 match &target {
259 ReschedulePolicy::ResourceGroup(r) | ReschedulePolicy::Both(_, r) => {
260 streaming_job.specific_resource_group = Set(r.resource_group.clone());
261 }
262 _ => {}
263 }
264
265 StreamingJob::update(streaming_job).exec(&txn).await?;
266 }
267
268 let job_ids: HashSet<JobId> = policy.keys().copied().collect();
269 let commands = build_reschedule_intent_for_jobs(&txn, job_ids).await?;
270
271 txn.commit().await?;
272
273 Ok(commands)
274 }
275
276 pub async fn reschedule_backfill_parallelism_inplace(
277 &self,
278 policy: HashMap<JobId, Option<StreamingParallelism>>,
279 ) -> MetaResult<HashMap<DatabaseId, Command>> {
280 if policy.is_empty() {
281 return Ok(HashMap::new());
282 }
283
284 let inner = self.metadata_manager.catalog_controller.inner.write().await;
285 let txn = inner.db.begin().await?;
286
287 for (table_id, parallelism) in &policy {
288 let streaming_job = StreamingJob::find_by_id(*table_id)
289 .one(&txn)
290 .await?
291 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?;
292
293 let max_parallelism = streaming_job.max_parallelism;
294
295 let mut streaming_job = streaming_job.into_active_model();
296
297 if let Some(StreamingParallelism::Fixed(n)) = parallelism
298 && *n > max_parallelism as usize
299 {
300 bail!(format!(
301 "specified backfill parallelism {n} should not exceed max parallelism {max_parallelism}"
302 ));
303 }
304
305 streaming_job.backfill_parallelism = Set(parallelism.clone());
306 streaming_job.update(&txn).await?;
307 }
308
309 let jobs = policy.keys().copied().collect();
310
311 let command = build_reschedule_intent_for_jobs(&txn, jobs).await?;
312
313 txn.commit().await?;
314
315 Ok(command)
316 }
317
318 pub async fn reschedule_fragment_inplace(
319 &self,
320 policy: HashMap<risingwave_meta_model::FragmentId, Option<StreamingParallelism>>,
321 ) -> MetaResult<HashMap<DatabaseId, Command>> {
322 if policy.is_empty() {
323 return Ok(HashMap::new());
324 }
325
326 let inner = self.metadata_manager.catalog_controller.inner.write().await;
327 let txn = inner.db.begin().await?;
328
329 let fragment_id_list = policy.keys().copied().collect_vec();
330
331 let existing_fragment_ids: HashSet<_> = Fragment::find()
332 .select_only()
333 .column(fragment::Column::FragmentId)
334 .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
335 .into_tuple::<risingwave_meta_model::FragmentId>()
336 .all(&txn)
337 .await?
338 .into_iter()
339 .collect();
340
341 if let Some(missing_fragment_id) = fragment_id_list
342 .iter()
343 .find(|fragment_id| !existing_fragment_ids.contains(*fragment_id))
344 {
345 return Err(MetaError::catalog_id_not_found(
346 "fragment",
347 *missing_fragment_id,
348 ));
349 }
350
351 let mut target_ensembles = vec![];
352
353 for ensemble in find_fragment_no_shuffle_dags_detailed(&txn, &fragment_id_list).await? {
354 let entry_fragment_ids = ensemble.entry_fragments().collect_vec();
355
356 let desired_parallelism = match entry_fragment_ids
357 .iter()
358 .filter_map(|fragment_id| policy.get(fragment_id).cloned())
359 .dedup()
360 .collect_vec()
361 .as_slice()
362 {
363 [] => {
364 bail_invalid_parameter!(
365 "none of the entry fragments {:?} were included in the reschedule request; \
366 provide at least one entry fragment id",
367 entry_fragment_ids
368 );
369 }
370 [parallelism] => parallelism.clone(),
371 parallelisms => {
372 bail!(
373 "conflicting reschedule policies for fragments in the same no-shuffle ensemble: {:?}",
374 parallelisms
375 );
376 }
377 };
378
379 let fragments = Fragment::find()
380 .filter(fragment::Column::FragmentId.is_in(entry_fragment_ids))
381 .all(&txn)
382 .await?;
383
384 debug_assert!(
385 fragments
386 .iter()
387 .map(|fragment| fragment.parallelism.as_ref())
388 .all_equal(),
389 "entry fragments in the same ensemble should share the same parallelism"
390 );
391
392 let current_parallelism = fragments
393 .first()
394 .and_then(|fragment| fragment.parallelism.clone());
395
396 if current_parallelism == desired_parallelism {
397 continue;
398 }
399
400 for fragment in fragments {
401 let mut fragment = fragment.into_active_model();
402 fragment.parallelism = Set(desired_parallelism.clone());
403 Fragment::update(fragment).exec(&txn).await?;
404 }
405
406 target_ensembles.push(ensemble);
407 }
408
409 if target_ensembles.is_empty() {
410 txn.commit().await?;
411 return Ok(HashMap::new());
412 }
413
414 let target_fragment_ids: HashSet<FragmentId> = target_ensembles
415 .iter()
416 .flat_map(|ensemble| ensemble.component_fragments())
417 .collect();
418 let commands = build_reschedule_intent_for_fragments(&txn, target_fragment_ids).await?;
419
420 txn.commit().await?;
421
422 Ok(commands)
423 }
424
425 async fn rerender(&self, jobs: HashSet<JobId>) -> MetaResult<HashMap<DatabaseId, Command>> {
426 if jobs.is_empty() {
427 return Ok(HashMap::new());
428 }
429
430 let inner = self.metadata_manager.catalog_controller.inner.read().await;
431 let txn = inner.db.begin().await?;
432 let commands = build_reschedule_intent_for_jobs(&txn, jobs).await?;
433 txn.commit().await?;
434 Ok(commands)
435 }
436}
437
438async fn build_reschedule_intent_for_jobs(
439 txn: &impl ConnectionTrait,
440 job_ids: HashSet<JobId>,
441) -> MetaResult<HashMap<DatabaseId, Command>> {
442 if job_ids.is_empty() {
443 return Ok(HashMap::new());
444 }
445
446 let job_id_list = job_ids.iter().copied().collect_vec();
447 let database_jobs: Vec<(DatabaseId, JobId)> = StreamingJob::find()
448 .select_only()
449 .column(object::Column::DatabaseId)
450 .column(streaming_job::Column::JobId)
451 .join(JoinType::LeftJoin, streaming_job::Relation::Object.def())
452 .filter(streaming_job::Column::JobId.is_in(job_id_list.clone()))
453 .into_tuple()
454 .all(txn)
455 .await?;
456
457 if database_jobs.len() != job_ids.len() {
458 let returned_jobs: HashSet<JobId> =
459 database_jobs.iter().map(|(_, job_id)| *job_id).collect();
460 let missing = job_ids.difference(&returned_jobs).copied().collect_vec();
461 return Err(MetaError::catalog_id_not_found(
462 "streaming job",
463 format!("{missing:?}"),
464 ));
465 }
466
467 let reschedule_context = load_reschedule_context_for_jobs(txn, job_ids).await?;
468 if reschedule_context.is_empty() {
469 return Ok(HashMap::new());
470 }
471
472 let commands = reschedule_context
473 .into_database_contexts()
474 .into_iter()
475 .map(|(database_id, context)| {
476 (
477 database_id,
478 Command::RescheduleIntent {
479 context,
480 reschedule_plan: None,
481 },
482 )
483 })
484 .collect();
485
486 Ok(commands)
487}
488
489async fn build_reschedule_intent_for_fragments(
490 txn: &impl ConnectionTrait,
491 fragment_ids: HashSet<FragmentId>,
492) -> MetaResult<HashMap<DatabaseId, Command>> {
493 if fragment_ids.is_empty() {
494 return Ok(HashMap::new());
495 }
496
497 let fragment_id_list = fragment_ids.iter().copied().collect_vec();
498 let fragment_databases: Vec<(FragmentId, DatabaseId)> = Fragment::find()
499 .select_only()
500 .column(fragment::Column::FragmentId)
501 .column(object::Column::DatabaseId)
502 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
503 .filter(fragment::Column::FragmentId.is_in(fragment_id_list.clone()))
504 .into_tuple()
505 .all(txn)
506 .await?;
507
508 if fragment_databases.len() != fragment_ids.len() {
509 let returned: HashSet<FragmentId> = fragment_databases
510 .iter()
511 .map(|(fragment_id, _)| *fragment_id)
512 .collect();
513 let missing = fragment_ids.difference(&returned).copied().collect_vec();
514 return Err(MetaError::catalog_id_not_found(
515 "fragment",
516 format!("{missing:?}"),
517 ));
518 }
519
520 let ensembles = find_fragment_no_shuffle_dags_detailed(txn, &fragment_id_list).await?;
521 let reschedule_context = load_reschedule_context_for_ensembles(txn, ensembles).await?;
522 if reschedule_context.is_empty() {
523 return Ok(HashMap::new());
524 }
525
526 let commands = reschedule_context
527 .into_database_contexts()
528 .into_iter()
529 .map(|(database_id, context)| {
530 (
531 database_id,
532 Command::RescheduleIntent {
533 context,
534 reschedule_plan: None,
535 },
536 )
537 })
538 .collect();
539
540 Ok(commands)
541}
542
543async fn load_reschedule_context_for_jobs(
544 txn: &impl ConnectionTrait,
545 job_ids: HashSet<JobId>,
546) -> MetaResult<RescheduleContext> {
547 let loaded = load_fragment_context_for_jobs(txn, job_ids).await?;
548 build_reschedule_context_from_loaded(txn, loaded).await
549}
550
551async fn load_reschedule_context_for_ensembles(
552 txn: &impl ConnectionTrait,
553 ensembles: Vec<NoShuffleEnsemble>,
554) -> MetaResult<RescheduleContext> {
555 let loaded = load_fragment_context(txn, ensembles).await?;
556 build_reschedule_context_from_loaded(txn, loaded).await
557}
558
559async fn build_reschedule_context_from_loaded(
560 txn: &impl ConnectionTrait,
561 loaded: LoadedFragmentContext,
562) -> MetaResult<RescheduleContext> {
563 if loaded.is_empty() {
564 return Ok(RescheduleContext::empty());
565 }
566
567 let job_ids = loaded.job_map.keys().copied().collect_vec();
568 let job_extra_info = get_streaming_job_extra_info(txn, job_ids).await?;
569
570 let fragment_ids = loaded
571 .job_fragments
572 .values()
573 .flat_map(|fragments| fragments.keys().copied())
574 .collect_vec();
575
576 let upstreams: Vec<(FragmentId, FragmentId, DispatcherType)> = FragmentRelation::find()
577 .select_only()
578 .columns([
579 fragment_relation::Column::TargetFragmentId,
580 fragment_relation::Column::SourceFragmentId,
581 fragment_relation::Column::DispatcherType,
582 ])
583 .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone()))
584 .into_tuple()
585 .all(txn)
586 .await?;
587
588 let mut upstream_fragments = HashMap::new();
589 for (fragment, upstream, dispatcher) in upstreams {
590 upstream_fragments
591 .entry(fragment as FragmentId)
592 .or_insert(HashMap::new())
593 .insert(upstream as FragmentId, dispatcher);
594 }
595
596 let downstreams = FragmentRelation::find()
597 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids.clone()))
598 .all(txn)
599 .await?;
600
601 let mut downstream_fragments = HashMap::new();
602 let mut downstream_relations = HashMap::new();
603 for relation in downstreams {
604 let source_fragment_id = relation.source_fragment_id as FragmentId;
605 let target_fragment_id = relation.target_fragment_id as FragmentId;
606 downstream_fragments
607 .entry(source_fragment_id)
608 .or_insert(HashMap::new())
609 .insert(target_fragment_id, relation.dispatcher_type);
610 downstream_relations.insert((source_fragment_id, target_fragment_id), relation);
611 }
612
613 Ok(RescheduleContext {
614 loaded,
615 job_extra_info,
616 upstream_fragments,
617 downstream_fragments,
618 downstream_relations,
619 })
620}
621
622fn diff_fragment(
634 prev_fragment_info: &InflightFragmentInfo,
635 curr_actors: &HashMap<ActorId, InflightActorInfo>,
636 upstream_fragments: HashMap<FragmentId, DispatcherType>,
637 downstream_fragments: HashMap<FragmentId, DispatcherType>,
638 all_actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
639 job_extra_info: Option<&StreamingJobExtraInfo>,
640) -> MetaResult<Reschedule> {
641 let prev_ids: HashSet<_> = prev_fragment_info.actors.keys().cloned().collect();
642 let curr_ids: HashSet<_> = curr_actors.keys().cloned().collect();
643
644 let removed_actors: HashSet<_> = &prev_ids - &curr_ids;
645 let added_actor_ids: HashSet<_> = &curr_ids - &prev_ids;
646 let kept_ids: HashSet<_> = prev_ids.intersection(&curr_ids).cloned().collect();
647 debug_assert!(
648 kept_ids.is_empty(),
649 "kept actors found in scale; expected full rebuild, prev={prev_ids:?}, curr={curr_ids:?}, kept={kept_ids:?}"
650 );
651
652 let mut added_actors = HashMap::new();
653 for &actor_id in &added_actor_ids {
654 let InflightActorInfo { worker_id, .. } = curr_actors
655 .get(&actor_id)
656 .ok_or_else(|| anyhow!("BUG: Worker not found for new actor {}", actor_id))?;
657
658 added_actors
659 .entry(*worker_id)
660 .or_insert_with(Vec::new)
661 .push(actor_id);
662 }
663
664 let mut vnode_bitmap_updates = HashMap::new();
665 for actor_id in kept_ids {
666 let prev_actor = &prev_fragment_info.actors[&actor_id];
667 let curr_actor = &curr_actors[&actor_id];
668
669 if prev_actor.vnode_bitmap != curr_actor.vnode_bitmap
671 && let Some(bitmap) = curr_actor.vnode_bitmap.clone()
672 {
673 vnode_bitmap_updates.insert(actor_id, bitmap);
674 }
675 }
676
677 let upstream_dispatcher_mapping =
678 if let DistributionType::Hash = prev_fragment_info.distribution_type {
679 let actor_mapping = curr_actors
680 .iter()
681 .map(
682 |(
683 actor_id,
684 InflightActorInfo {
685 worker_id: _,
686 vnode_bitmap,
687 ..
688 },
689 )| { (*actor_id, vnode_bitmap.clone().unwrap()) },
690 )
691 .collect();
692 Some(ActorMapping::from_bitmaps(&actor_mapping))
693 } else {
694 None
695 };
696
697 let upstream_fragment_dispatcher_ids = upstream_fragments
698 .iter()
699 .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
700 .map(|(upstream_fragment, _)| (*upstream_fragment, prev_fragment_info.fragment_id))
701 .collect();
702
703 let downstream_fragment_ids = downstream_fragments
704 .iter()
705 .filter(|&(_, dispatcher_type)| *dispatcher_type != DispatcherType::NoShuffle)
706 .map(|(fragment_id, _)| *fragment_id)
707 .collect();
708
709 let extra_info = job_extra_info.cloned().unwrap_or_default();
710 let expr_context = extra_info.stream_context().to_expr_context();
711 let job_definition = extra_info.job_definition;
712 let config_override = extra_info.config_override;
713
714 let newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)> =
715 added_actor_ids
716 .iter()
717 .map(|actor_id| {
718 let actor = StreamActor {
719 actor_id: *actor_id,
720 fragment_id: prev_fragment_info.fragment_id,
721 vnode_bitmap: curr_actors[actor_id].vnode_bitmap.clone(),
722 mview_definition: job_definition.clone(),
723 expr_context: Some(expr_context.clone()),
724 config_override: config_override.clone(),
725 };
726 (
727 *actor_id,
728 (
729 (
730 actor,
731 all_actor_dispatchers
732 .get(actor_id)
733 .cloned()
734 .unwrap_or_default(),
735 ),
736 curr_actors[actor_id].worker_id,
737 ),
738 )
739 })
740 .collect();
741
742 let actor_splits = curr_actors
743 .iter()
744 .map(|(&actor_id, info)| (actor_id, info.splits.clone()))
745 .collect();
746
747 let reschedule = Reschedule {
748 added_actors,
749 removed_actors,
750 vnode_bitmap_updates,
751 upstream_fragment_dispatcher_ids,
752 upstream_dispatcher_mapping,
753 downstream_fragment_ids,
754 actor_splits,
755 newly_created_actors,
756 };
757
758 Ok(reschedule)
759}
760
761pub(crate) fn build_reschedule_commands(
766 render_result: FragmentRenderMap,
767 context: RescheduleContext,
768 all_prev_fragments: HashMap<FragmentId, &InflightFragmentInfo>,
769) -> MetaResult<HashMap<DatabaseId, ReschedulePlan>> {
770 if render_result.is_empty() {
771 return Ok(HashMap::new());
772 }
773
774 let RescheduleContext {
775 job_extra_info,
776 upstream_fragments: mut all_upstream_fragments,
777 downstream_fragments: mut all_downstream_fragments,
778 mut downstream_relations,
779 ..
780 } = context;
781
782 let fragment_ids = render_result
783 .values()
784 .flat_map(|jobs| jobs.values())
785 .flatten()
786 .map(|(fragment_id, _)| *fragment_id)
787 .collect_vec();
788
789 let all_related_fragment_ids: HashSet<_> = fragment_ids
790 .iter()
791 .copied()
792 .chain(all_upstream_fragments.values().flatten().map(|(id, _)| *id))
793 .chain(
794 all_downstream_fragments
795 .values()
796 .flatten()
797 .map(|(id, _)| *id),
798 )
799 .collect();
800
801 for fragment_id in all_related_fragment_ids {
802 if !all_prev_fragments.contains_key(&fragment_id) {
803 return Err(MetaError::from(anyhow!(
804 "previous fragment info for {fragment_id} not found"
805 )));
806 }
807 }
808
809 let all_rendered_fragments: HashMap<_, _> = render_result
810 .values()
811 .flat_map(|jobs| jobs.values())
812 .flatten()
813 .map(|(fragment_id, info)| (*fragment_id, info))
814 .collect();
815
816 let mut commands = HashMap::new();
817
818 for (database_id, jobs) in &render_result {
819 let mut all_fragment_actors = HashMap::new();
820 let mut reschedules = HashMap::new();
821
822 for (job_id, fragment_id, fragment_info) in jobs.iter().flat_map(|(job_id, fragments)| {
823 fragments
824 .iter()
825 .map(move |(fragment_id, info)| (job_id, fragment_id, info))
826 }) {
827 let InflightFragmentInfo {
828 distribution_type,
829 actors,
830 ..
831 } = fragment_info;
832
833 let upstream_fragments = all_upstream_fragments
834 .remove(&(*fragment_id as FragmentId))
835 .unwrap_or_default();
836 let downstream_fragments = all_downstream_fragments
837 .remove(&(*fragment_id as FragmentId))
838 .unwrap_or_default();
839
840 let fragment_actors: HashMap<_, _> = upstream_fragments
841 .keys()
842 .copied()
843 .chain(downstream_fragments.keys().copied())
844 .map(|fragment_id| {
845 all_prev_fragments
846 .get(&fragment_id)
847 .map(|fragment| {
848 (
849 fragment_id,
850 fragment.actors.keys().copied().collect::<HashSet<_>>(),
851 )
852 })
853 .ok_or_else(|| {
854 MetaError::from(anyhow!(
855 "fragment {} not found in previous state",
856 fragment_id
857 ))
858 })
859 })
860 .collect::<MetaResult<_>>()?;
861
862 all_fragment_actors.extend(fragment_actors);
863
864 let source_fragment_actors = actors
865 .iter()
866 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
867 .collect();
868
869 let mut all_actor_dispatchers: HashMap<_, Vec<_>> = HashMap::new();
870
871 for downstream_fragment_id in downstream_fragments.keys() {
872 let target_fragment_actors =
873 match all_rendered_fragments.get(downstream_fragment_id) {
874 None => {
875 let external_fragment = all_prev_fragments
876 .get(downstream_fragment_id)
877 .ok_or_else(|| {
878 MetaError::from(anyhow!(
879 "fragment {} not found in previous state",
880 downstream_fragment_id
881 ))
882 })?;
883
884 external_fragment
885 .actors
886 .iter()
887 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
888 .collect()
889 }
890 Some(downstream_rendered) => downstream_rendered
891 .actors
892 .iter()
893 .map(|(actor_id, info)| (*actor_id, info.vnode_bitmap.clone()))
894 .collect(),
895 };
896
897 let target_fragment_distribution = *distribution_type;
898
899 let fragment_relation::Model {
900 source_fragment_id: _,
901 target_fragment_id: _,
902 dispatcher_type,
903 dist_key_indices,
904 output_indices,
905 output_type_mapping,
906 } = downstream_relations
907 .remove(&(
908 *fragment_id as FragmentId,
909 *downstream_fragment_id as FragmentId,
910 ))
911 .ok_or_else(|| {
912 MetaError::from(anyhow!(
913 "downstream relation missing for {} -> {}",
914 fragment_id,
915 downstream_fragment_id
916 ))
917 })?;
918
919 let pb_mapping = PbDispatchOutputMapping {
920 indices: output_indices.into_u32_array(),
921 types: output_type_mapping.unwrap_or_default().to_protobuf(),
922 };
923
924 let (dispatchers, _) = compose_dispatchers(
925 *distribution_type,
926 &source_fragment_actors,
927 *downstream_fragment_id,
928 target_fragment_distribution,
929 &target_fragment_actors,
930 dispatcher_type,
931 dist_key_indices.into_u32_array(),
932 pb_mapping,
933 );
934
935 for (actor_id, dispatcher) in dispatchers {
936 all_actor_dispatchers
937 .entry(actor_id)
938 .or_default()
939 .push(dispatcher);
940 }
941 }
942
943 let prev_fragment = all_prev_fragments.get(&{ *fragment_id }).ok_or_else(|| {
944 MetaError::from(anyhow!(
945 "fragment {} not found in previous state",
946 fragment_id
947 ))
948 })?;
949
950 let reschedule = diff_fragment(
951 prev_fragment,
952 actors,
953 upstream_fragments,
954 downstream_fragments,
955 all_actor_dispatchers,
956 job_extra_info.get(job_id),
957 )?;
958
959 reschedules.insert(*fragment_id as FragmentId, reschedule);
960 }
961
962 let command = ReschedulePlan {
963 reschedules,
964 fragment_actors: all_fragment_actors,
965 };
966
967 debug_assert!(
968 command
969 .reschedules
970 .values()
971 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
972 "reschedule plan carries vnode_bitmap_updates, expected full rebuild"
973 );
974
975 commands.insert(*database_id, command);
976 }
977
978 Ok(commands)
979}
980
981#[cfg(test)]
982mod tests {
983 use std::collections::{HashMap, HashSet};
984
985 use risingwave_common::bitmap::Bitmap;
986 use risingwave_common::catalog::FragmentTypeMask;
987 use risingwave_connector::source::SplitImpl;
988 use risingwave_connector::source::test_source::TestSourceSplit;
989
990 use super::*;
991
992 fn actor(worker_id: impl Into<WorkerId>, vnode_bitmap: Option<Bitmap>) -> InflightActorInfo {
993 InflightActorInfo {
994 worker_id: worker_id.into(),
995 vnode_bitmap,
996 splits: vec![],
997 }
998 }
999
1000 fn actor_with_splits(
1001 worker_id: impl Into<WorkerId>,
1002 vnode_bitmap: Option<Bitmap>,
1003 splits: Vec<SplitImpl>,
1004 ) -> InflightActorInfo {
1005 InflightActorInfo {
1006 worker_id: worker_id.into(),
1007 vnode_bitmap,
1008 splits,
1009 }
1010 }
1011
1012 fn test_split(id: &str, offset: &str) -> SplitImpl {
1013 SplitImpl::Test(TestSourceSplit {
1014 id: id.into(),
1015 properties: HashMap::new(),
1016 offset: offset.to_owned(),
1017 })
1018 }
1019
1020 fn fragment(
1021 fragment_id: FragmentId,
1022 actors: impl IntoIterator<Item = (ActorId, InflightActorInfo)>,
1023 ) -> InflightFragmentInfo {
1024 InflightFragmentInfo {
1025 fragment_id,
1026 distribution_type: DistributionType::Hash,
1027 fragment_type_mask: FragmentTypeMask::empty(),
1028 vnode_count: 8,
1029 nodes: Default::default(),
1030 actors: actors.into_iter().collect(),
1031 state_table_ids: HashSet::new(),
1032 }
1033 }
1034
1035 fn render_result(fragments: Vec<(FragmentId, InflightFragmentInfo)>) -> FragmentRenderMap {
1036 HashMap::from([(
1037 DatabaseId::new(1),
1038 HashMap::from([(
1039 JobId::new(1),
1040 fragments.into_iter().collect::<HashMap<_, _>>(),
1041 )]),
1042 )])
1043 }
1044
1045 #[test]
1046 fn rendered_layout_matches_current_ignores_actor_ids_across_fragments() {
1047 let current_source = fragment(
1048 FragmentId::new(1),
1049 [
1050 (
1051 ActorId::new(101),
1052 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1053 ),
1054 (
1055 ActorId::new(102),
1056 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1057 ),
1058 ],
1059 );
1060 let current_target = fragment(
1061 FragmentId::new(2),
1062 [
1063 (
1064 ActorId::new(201),
1065 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1066 ),
1067 (
1068 ActorId::new(202),
1069 actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1070 ),
1071 ],
1072 );
1073
1074 let rendered = render_result(vec![
1075 (
1076 FragmentId::new(1),
1077 fragment(
1078 FragmentId::new(1),
1079 [
1080 (
1081 ActorId::new(1001),
1082 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1083 ),
1084 (
1085 ActorId::new(1002),
1086 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1087 ),
1088 ],
1089 ),
1090 ),
1091 (
1092 FragmentId::new(2),
1093 fragment(
1094 FragmentId::new(2),
1095 [
1096 (
1097 ActorId::new(2001),
1098 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1099 ),
1100 (
1101 ActorId::new(2002),
1102 actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1103 ),
1104 ],
1105 ),
1106 ),
1107 ]);
1108
1109 let prev = HashMap::from([
1110 (FragmentId::new(1), ¤t_source),
1111 (FragmentId::new(2), ¤t_target),
1112 ]);
1113
1114 assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1115 }
1116
1117 #[test]
1118 fn rendered_layout_matches_current_detects_target_fragment_layout_changes() {
1119 let current_source = fragment(
1120 FragmentId::new(1),
1121 [
1122 (
1123 ActorId::new(101),
1124 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1125 ),
1126 (
1127 ActorId::new(102),
1128 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1129 ),
1130 ],
1131 );
1132 let current_target = fragment(
1133 FragmentId::new(2),
1134 [
1135 (
1136 ActorId::new(201),
1137 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1138 ),
1139 (
1140 ActorId::new(202),
1141 actor(4, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1142 ),
1143 ],
1144 );
1145
1146 let rendered = render_result(vec![
1147 (
1148 FragmentId::new(1),
1149 fragment(
1150 FragmentId::new(1),
1151 [
1152 (
1153 ActorId::new(1001),
1154 actor(1, Some(Bitmap::from_indices(8, &[0, 1, 2, 3]))),
1155 ),
1156 (
1157 ActorId::new(1002),
1158 actor(2, Some(Bitmap::from_indices(8, &[4, 5, 6, 7]))),
1159 ),
1160 ],
1161 ),
1162 ),
1163 (
1164 FragmentId::new(2),
1165 fragment(
1166 FragmentId::new(2),
1167 [
1168 (
1169 ActorId::new(2001),
1170 actor(3, Some(Bitmap::from_indices(8, &[0, 1, 2, 3, 4, 5]))),
1171 ),
1172 (
1173 ActorId::new(2002),
1174 actor(4, Some(Bitmap::from_indices(8, &[6, 7]))),
1175 ),
1176 ],
1177 ),
1178 ),
1179 ]);
1180
1181 let prev = HashMap::from([
1182 (FragmentId::new(1), ¤t_source),
1183 (FragmentId::new(2), ¤t_target),
1184 ]);
1185
1186 assert!(!rendered_layout_matches_current(&rendered, &prev,).unwrap());
1187 }
1188
1189 #[test]
1190 fn rendered_layout_matches_current_ignores_split_offsets() {
1191 let fragment_id = FragmentId::new(1);
1192 let prev_fragment = fragment(
1193 fragment_id,
1194 [(
1195 ActorId::new(101),
1196 actor_with_splits(
1197 1,
1198 Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1199 vec![test_split("split-0", "100")],
1200 ),
1201 )],
1202 );
1203 let rendered = render_result(vec![(
1204 fragment_id,
1205 fragment(
1206 fragment_id,
1207 [(
1208 ActorId::new(1001),
1209 actor_with_splits(
1210 1,
1211 Some(Bitmap::from_indices(8, &[0, 1, 2, 3])),
1212 vec![test_split("split-0", "200")],
1213 ),
1214 )],
1215 ),
1216 )]);
1217 let prev = HashMap::from([(fragment_id, &prev_fragment)]);
1218
1219 assert!(rendered_layout_matches_current(&rendered, &prev,).unwrap());
1220 }
1221}
1222
1223#[derive(Clone, Debug, Eq, PartialEq)]
1224pub struct ParallelismPolicy {
1225 pub parallelism: StreamingParallelism,
1226}
1227
1228#[derive(Clone, Debug)]
1229pub struct ResourceGroupPolicy {
1230 pub resource_group: Option<String>,
1231}
1232
1233#[derive(Clone, Debug)]
1234pub enum ReschedulePolicy {
1235 Parallelism(ParallelismPolicy),
1236 ResourceGroup(ResourceGroupPolicy),
1237 Both(ParallelismPolicy, ResourceGroupPolicy),
1238}
1239
1240impl GlobalStreamManager {
1241 #[await_tree::instrument("acquire_reschedule_read_guard")]
1242 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
1243 self.scale_controller.reschedule_lock.read().await
1244 }
1245
1246 #[await_tree::instrument("acquire_reschedule_write_guard")]
1247 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
1248 self.scale_controller.reschedule_lock.write().await
1249 }
1250
1251 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
1260 tracing::info!("trigger parallelism control");
1261
1262 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
1263
1264 let background_streaming_jobs = self
1265 .metadata_manager
1266 .list_background_creating_jobs()
1267 .await?;
1268
1269 let blocked_jobs = self
1270 .metadata_manager
1271 .collect_reschedule_blocked_jobs_for_creating_jobs(&background_streaming_jobs, true)
1272 .await?;
1273 let has_blocked_jobs = !blocked_jobs.is_empty();
1274
1275 let database_objects: HashMap<risingwave_meta_model::DatabaseId, Vec<JobId>> = self
1276 .metadata_manager
1277 .catalog_controller
1278 .list_streaming_job_with_database()
1279 .await?;
1280
1281 let job_ids = database_objects
1282 .iter()
1283 .flat_map(|(database_id, job_ids)| {
1284 job_ids
1285 .iter()
1286 .enumerate()
1287 .map(move |(idx, job_id)| (idx, database_id, job_id))
1288 })
1289 .sorted_by(|(idx_a, database_a, _), (idx_b, database_b, _)| {
1290 idx_a.cmp(idx_b).then(database_a.cmp(database_b))
1291 })
1292 .map(|(_, database_id, job_id)| (*database_id, *job_id))
1293 .filter(|(_, job_id)| !blocked_jobs.contains(job_id))
1294 .collect_vec();
1295
1296 if job_ids.is_empty() {
1297 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
1298 return Ok(has_blocked_jobs);
1302 }
1303
1304 let active_workers =
1305 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
1306
1307 tracing::info!(
1308 "trigger parallelism control for jobs: {:#?}, workers {:#?}",
1309 job_ids,
1310 active_workers.current()
1311 );
1312
1313 let batch_size = match self.env.opts.parallelism_control_batch_size {
1314 0 => job_ids.len(),
1315 n => n,
1316 };
1317
1318 tracing::info!(
1319 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
1320 job_ids.len(),
1321 batch_size,
1322 active_workers.current()
1323 );
1324
1325 let batches: Vec<_> = job_ids
1326 .into_iter()
1327 .chunks(batch_size)
1328 .into_iter()
1329 .map(|chunk| chunk.collect_vec())
1330 .collect();
1331
1332 for batch in batches {
1333 let jobs = batch.iter().map(|(_, job_id)| *job_id).collect();
1334
1335 let commands = self.scale_controller.rerender(jobs).await?;
1336
1337 let futures = commands.into_iter().map(|(database_id, command)| {
1338 let barrier_scheduler = self.barrier_scheduler.clone();
1339 async move { barrier_scheduler.run_command(database_id, command).await }
1340 });
1341
1342 let _results = future::try_join_all(futures).await?;
1343 }
1344
1345 Ok(has_blocked_jobs)
1346 }
1347
1348 async fn run(&self, mut shutdown_rx: Receiver<()>) {
1350 tracing::info!("starting automatic parallelism control monitor");
1351
1352 let check_period =
1353 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
1354
1355 let mut ticker = tokio::time::interval_at(
1356 Instant::now()
1357 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
1358 check_period,
1359 );
1360 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
1361
1362 let (local_notification_tx, mut local_notification_rx) =
1363 tokio::sync::mpsc::unbounded_channel();
1364
1365 self.env
1366 .notification_manager()
1367 .insert_local_sender(local_notification_tx);
1368
1369 ticker.tick().await;
1371
1372 let worker_nodes = self
1373 .metadata_manager
1374 .list_active_streaming_compute_nodes()
1375 .await
1376 .expect("list active streaming compute nodes");
1377
1378 let mut worker_cache: BTreeMap<_, _> = worker_nodes
1379 .into_iter()
1380 .map(|worker| (worker.id, worker))
1381 .collect();
1382
1383 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
1384
1385 let mut should_trigger = false;
1386
1387 loop {
1388 tokio::select! {
1389 biased;
1390
1391 _ = &mut shutdown_rx => {
1392 tracing::info!("Stream manager is stopped");
1393 break;
1394 }
1395
1396 _ = ticker.tick(), if should_trigger => {
1397 let include_workers = worker_cache.keys().copied().collect_vec();
1398
1399 if include_workers.is_empty() {
1400 tracing::debug!("no available worker nodes");
1401 should_trigger = false;
1402 continue;
1403 }
1404
1405 match self.trigger_parallelism_control().await {
1406 Ok(cont) => {
1407 should_trigger = cont;
1408 }
1409 Err(e) => {
1410 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
1411 ticker.reset();
1412 }
1413 }
1414 }
1415
1416 notification = local_notification_rx.recv() => {
1417 let notification = notification.expect("local notification channel closed in loop of stream manager");
1418
1419 let worker_is_streaming_compute = |worker: &WorkerNode| {
1421 worker.get_type() == Ok(WorkerType::ComputeNode)
1422 && worker.property.as_ref().unwrap().is_streaming
1423 };
1424
1425 match notification {
1426 LocalNotification::SystemParamsChange(reader) => {
1427 let new_strategy = reader.adaptive_parallelism_strategy();
1428 if new_strategy != previous_adaptive_parallelism_strategy {
1429 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
1430 should_trigger = true;
1431 previous_adaptive_parallelism_strategy = new_strategy;
1432 }
1433 }
1434 LocalNotification::WorkerNodeActivated(worker) => {
1435 if !worker_is_streaming_compute(&worker) {
1436 continue;
1437 }
1438
1439 tracing::info!(worker = %worker.id, "worker activated notification received");
1440
1441 let prev_worker = worker_cache.insert(worker.id, worker.clone());
1442
1443 match prev_worker {
1444 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
1445 tracing::info!(worker = %worker.id, "worker parallelism changed");
1446 should_trigger = true;
1447 }
1448 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
1449 tracing::info!(worker = %worker.id, "worker label changed");
1450 should_trigger = true;
1451 }
1452 None => {
1453 tracing::info!(worker = %worker.id, "new worker joined");
1454 should_trigger = true;
1455 }
1456 _ => {}
1457 }
1458 }
1459
1460 LocalNotification::WorkerNodeDeleted(worker) => {
1463 if !worker_is_streaming_compute(&worker) {
1464 continue;
1465 }
1466
1467 match worker_cache.remove(&worker.id) {
1468 Some(prev_worker) => {
1469 tracing::info!(worker = %prev_worker.id, "worker removed from stream manager cache");
1470 }
1471 None => {
1472 tracing::warn!(worker = %worker.id, "worker not found in stream manager cache, but it was removed");
1473 }
1474 }
1475 }
1476
1477 LocalNotification::StreamingJobBackfillFinished(job_id) => {
1478 tracing::debug!(job_id = %job_id, "received backfill finished notification");
1479 if let Err(e) = self.apply_post_backfill_parallelism(job_id).await {
1480 tracing::warn!(job_id = %job_id, error = %e.as_report(), "failed to restore parallelism after backfill");
1481 should_trigger = true;
1484 }
1485 }
1486
1487 _ => {}
1488 }
1489 }
1490 }
1491 }
1492 }
1493
1494 async fn apply_post_backfill_parallelism(&self, job_id: JobId) -> MetaResult<()> {
1496 let Some((target, backfill_parallelism)) = self
1499 .metadata_manager
1500 .catalog_controller
1501 .get_job_parallelisms(job_id)
1502 .await?
1503 else {
1504 tracing::debug!(
1507 job_id = %job_id,
1508 "streaming job not found when applying post-backfill parallelism, skip"
1509 );
1510 return Ok(());
1511 };
1512
1513 match backfill_parallelism {
1515 Some(backfill_parallelism) if backfill_parallelism == target => {
1516 tracing::debug!(
1519 job_id = %job_id,
1520 ?backfill_parallelism,
1521 ?target,
1522 "backfill parallelism equals job parallelism, skip reschedule"
1523 );
1524 return Ok(());
1525 }
1526 Some(_) => {
1527 }
1529 None => {
1530 tracing::debug!(
1533 job_id = %job_id,
1534 ?target,
1535 "no backfill parallelism configured, skip post-backfill reschedule"
1536 );
1537 return Ok(());
1538 }
1539 }
1540
1541 tracing::info!(
1543 job_id = %job_id,
1544 ?target,
1545 ?backfill_parallelism,
1546 "restoring parallelism after backfill via reschedule"
1547 );
1548 let policy = ReschedulePolicy::Parallelism(ParallelismPolicy {
1549 parallelism: target,
1550 });
1551 if let Err(e) = self.reschedule_streaming_job(job_id, policy, false).await {
1552 tracing::warn!(job_id = %job_id, error = %e.as_report(), "reschedule after backfill failed");
1553 return Err(e);
1554 }
1555
1556 tracing::info!(job_id = %job_id, "parallelism reschedule after backfill submitted");
1557 Ok(())
1558 }
1559
1560 pub fn start_auto_parallelism_monitor(
1561 self: Arc<Self>,
1562 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
1563 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
1564 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1565 let join_handle = tokio::spawn(async move {
1566 self.run(shutdown_rx).await;
1567 });
1568
1569 (join_handle, shutdown_tx)
1570 }
1571}