pub enum Command {
Flush,
Pause(PausedReason),
Resume(PausedReason),
DropStreamingJobs {
table_fragments_ids: HashSet<TableId>,
actors: Vec<ActorId>,
unregistered_state_table_ids: HashSet<TableId>,
unregistered_fragment_ids: HashSet<FragmentId>,
},
CreateStreamingJob {
info: CreateStreamingJobCommandInfo,
job_type: CreateStreamingJobType,
},
MergeSnapshotBackfillStreamingJobs(HashMap<TableId, (SnapshotBackfillInfo, InflightStreamingJobInfo)>),
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
table_parallelism: HashMap<TableId, TableParallelism>,
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
},
ReplaceTable(ReplaceTablePlan),
SourceSplitAssignment(SplitAssignment),
Throttle(ThrottleConfig),
CreateSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
retention_second: u64,
},
DropSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
},
}
Expand description
Command
is the input of crate::barrier::worker::GlobalBarrierWorker
. For different commands,
it will build different barriers to send, and may do different stuffs after the barrier is
collected.
Variants§
Flush
Flush
command will generate a checkpoint barrier. After the barrier is collected and committed
all messages before the checkpoint barrier should have been committed.
Pause(PausedReason)
Pause
command generates a Pause
barrier with the provided PausedReason
only if
the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
Resume(PausedReason)
Resume
command generates a Resume
barrier with the provided PausedReason
only
if the cluster is paused with the same reason. Otherwise, a barrier with no mutation
will be generated.
DropStreamingJobs
DropStreamingJobs
command generates a Stop
barrier to stop the given
Vec<ActorId>
. The catalog has ensured that these streaming jobs are safe to be
dropped by reference counts before.
Barriers from the actors to be dropped will STILL be collected. After the barrier is collected, it notifies the local stream manager of compute nodes to drop actors, and then delete the table fragments info from meta store.
Fields
unregistered_fragment_ids: HashSet<FragmentId>
CreateStreamingJob
CreateStreamingJob
command generates a Add
barrier by given info.
Barriers from the actors to be created, which is marked as Inactive
at first, will STILL
be collected since the barrier should be passthrough.
After the barrier is collected, these newly created actors will be marked as Running
. And
it adds the table fragments info to meta store. However, the creating progress will last
for a while until the finish
channel is signaled, then the state of TableFragments
will be set to Created
.
MergeSnapshotBackfillStreamingJobs(HashMap<TableId, (SnapshotBackfillInfo, InflightStreamingJobInfo)>)
RescheduleFragment
Reschedule
command generates a Update
barrier by the Reschedule
of each fragment.
Mainly used for scaling and migration.
Barriers from which actors should be collected, and the post behavior of this command are
very similar to Create
and Drop
commands, for added and removed actors, respectively.
Fields
reschedules: HashMap<FragmentId, Reschedule>
table_parallelism: HashMap<TableId, TableParallelism>
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>
ReplaceTable(ReplaceTablePlan)
ReplaceTable
command generates a Update
barrier with the given merge_updates
. This is
essentially switching the downstream of the old table fragments to the new ones, and
dropping the old table fragments. Used for table schema change.
This can be treated as a special case of RescheduleFragment
, while the upstream fragment
of the Merge executors are changed additionally.
SourceSplitAssignment(SplitAssignment)
SourceSplitAssignment
generates a Splits
barrier for pushing initialized splits or
changed splits.
Throttle(ThrottleConfig)
Throttle
command generates a Throttle
barrier with the given throttle config to change
the rate_limit
of FlowControl
Executor after StreamScan
or Source.
CreateSubscription
CreateSubscription
command generates a CreateSubscriptionMutation
to notify
materialize executor to start storing old value for subscription.
DropSubscription
DropSubscription
command generates a DropSubscriptionsMutation
to notify
materialize executor to stop storing old value when there is no
subscription depending on it.
Implementations§
source§impl Command
impl Command
pub fn pause(reason: PausedReason) -> Self
pub fn resume(reason: PausedReason) -> Self
pub fn cancel(table_fragments: &TableFragments) -> Self
pub(crate) fn fragment_changes( &self, ) -> Option<HashMap<FragmentId, CommandFragmentChanges>>
sourcepub fn should_pause_inject_barrier(&self) -> bool
pub fn should_pause_inject_barrier(&self) -> bool
If we need to send a barrier to modify actor configuration, we will pause the barrier injection. return true.
pub fn need_checkpoint(&self) -> bool
source§impl Command
impl Command
sourcepub fn to_mutation(
&self,
current_paused_reason: Option<PausedReason>,
) -> Option<Mutation>
pub fn to_mutation( &self, current_paused_reason: Option<PausedReason>, ) -> Option<Mutation>
Generate a mutation for the given command.
pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>>
fn generate_update_mutation_for_replace_table( old_table_fragments: &TableFragments, merge_updates: &[MergeUpdate], dispatchers: &HashMap<ActorId, Vec<Dispatcher>>, init_split_assignment: &SplitAssignment, ) -> Option<Mutation>
sourcepub fn next_paused_reason(
this: Option<&Self>,
current_paused_reason: Option<PausedReason>,
) -> Option<PausedReason>
pub fn next_paused_reason( this: Option<&Self>, current_paused_reason: Option<PausedReason>, ) -> Option<PausedReason>
Returns the paused reason after executing the current command.
sourcepub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_
pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_
For CancelStreamingJob
, returns the table id of the target table.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Command
impl RefUnwindSafe for Command
impl Send for Command
impl Sync for Command
impl Unpin for Command
impl UnwindSafe for Command
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.