pub struct ScaleController {
pub metadata_manager: MetadataManager,
pub source_manager: SourceManagerRef,
pub env: MetaSrvEnv,
pub reschedule_lock: RwLock<()>,
}
Fields§
§metadata_manager: MetadataManager
§source_manager: SourceManagerRef
§env: MetaSrvEnv
§reschedule_lock: RwLock<()>
We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state. e.g., a MV cannot be rescheduled during foreground backfill.
Implementations§
source§impl ScaleController
impl ScaleController
pub fn new( metadata_manager: &MetadataManager, source_manager: SourceManagerRef, env: MetaSrvEnv, ) -> Self
pub async fn integrity_check(&self) -> MetaResult<()>
sourceasync fn build_reschedule_context(
&self,
reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
options: RescheduleOptions,
table_parallelisms: Option<&mut HashMap<TableId, TableParallelism>>,
) -> MetaResult<RescheduleContext>
async fn build_reschedule_context( &self, reschedule: &mut HashMap<FragmentId, WorkerReschedule>, options: RescheduleOptions, table_parallelisms: Option<&mut HashMap<TableId, TableParallelism>>, ) -> MetaResult<RescheduleContext>
Build the context for rescheduling and do some validation for the request.
sourcepub(crate) async fn analyze_reschedule_plan(
&self,
reschedules: HashMap<FragmentId, WorkerReschedule>,
options: RescheduleOptions,
table_parallelisms: Option<&mut HashMap<TableId, TableParallelism>>,
) -> MetaResult<HashMap<FragmentId, Reschedule>>
pub(crate) async fn analyze_reschedule_plan( &self, reschedules: HashMap<FragmentId, WorkerReschedule>, options: RescheduleOptions, table_parallelisms: Option<&mut HashMap<TableId, TableParallelism>>, ) -> MetaResult<HashMap<FragmentId, Reschedule>>
From the high-level WorkerReschedule
to the low-level reschedule plan Reschedule
.
Returns (reschedule_fragment, applied_reschedules)
reschedule_fragment
: the generated reschedule planapplied_reschedules
: the changes that need to be updated to the meta store (pre_apply_reschedules
, only for V1).
In normal process of scaling, we use the returned values to
build a Command::RescheduleFragment
, which will then flows through the barrier mechanism to perform scaling.
Meta store is updated after the barrier is collected.
During recovery, we don’t need the barrier mechanism, and can directly use the returned values to update meta.
fn arrange_reschedules( &self, reschedule: &HashMap<FragmentId, WorkerReschedule>, ctx: &RescheduleContext, ) -> MetaResult<(HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>, HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>)>
sourcefn modify_actor_upstream_and_downstream(
ctx: &RescheduleContext,
fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
new_actor: &mut PbStreamActor,
) -> MetaResult<()>
fn modify_actor_upstream_and_downstream( ctx: &RescheduleContext, fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>, fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>, fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>, no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>, no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>, new_actor: &mut PbStreamActor, ) -> MetaResult<()>
Modifies the upstream and downstream actors of the new created actor according to the overall changes, and is used to handle cascading updates
pub async fn post_apply_reschedule( &self, reschedules: &HashMap<FragmentId, Reschedule>, table_parallelism: &HashMap<TableId, TableParallelism>, ) -> MetaResult<()>
pub async fn generate_table_resize_plan( &self, policy: TableResizePolicy, ) -> MetaResult<HashMap<FragmentId, WorkerReschedule>>
pub(crate) fn filter_unschedulable_workers( workers: &[WorkerNode], ) -> HashSet<WorkerId>
fn diff_worker_slot_changes( fragment_worker_slots: &BTreeMap<WorkerId, usize>, target_worker_slots: &BTreeMap<WorkerId, usize>, ) -> WorkerReschedule
fn build_no_shuffle_relation_index( actor_map: &HashMap<ActorId, CustomActorInfo>, no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>, no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>, )
fn build_fragment_dispatcher_index( actor_map: &HashMap<ActorId, CustomActorInfo>, fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>, )
pub fn resolve_no_shuffle_upstream_tables( fragment_ids: HashSet<FragmentId>, fragment_map: &HashMap<FragmentId, CustomFragmentInfo>, no_shuffle_source_fragment_ids: &HashSet<FragmentId>, no_shuffle_target_fragment_ids: &HashSet<FragmentId>, fragment_to_table: &HashMap<FragmentId, TableId>, table_parallelisms: &mut HashMap<TableId, TableParallelism>, ) -> MetaResult<()>
pub fn resolve_no_shuffle_upstream_fragments<T>( reschedule: &mut HashMap<FragmentId, T>, fragment_map: &HashMap<FragmentId, CustomFragmentInfo>, no_shuffle_source_fragment_ids: &HashSet<FragmentId>, no_shuffle_target_fragment_ids: &HashSet<FragmentId>, ) -> MetaResult<()>
Auto Trait Implementations§
impl !Freeze for ScaleController
impl !RefUnwindSafe for ScaleController
impl Send for ScaleController
impl Sync for ScaleController
impl Unpin for ScaleController
impl !UnwindSafe for ScaleController
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.