pub enum Command {
Show 17 variants
Flush,
Pause,
Resume,
DropStreamingJobs {
streaming_job_ids: HashSet<JobId>,
actors: Vec<ActorId>,
unregistered_state_table_ids: HashSet<TableId>,
unregistered_fragment_ids: HashSet<FragmentId>,
dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
},
CreateStreamingJob {
info: CreateStreamingJobCommandInfo,
job_type: CreateStreamingJobType,
cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
},
MergeSnapshotBackfillStreamingJobs(HashMap<JobId, (HashSet<TableId>, InflightStreamingJobInfo)>),
RescheduleFragment {
reschedules: HashMap<FragmentId, Reschedule>,
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
},
ReplaceStreamJob(ReplaceStreamJobPlan),
SourceChangeSplit(SplitState),
Throttle(ThrottleConfig),
CreateSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
retention_second: u64,
},
DropSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
},
ConnectorPropsChange(ConnectorPropsChange),
StartFragmentBackfill {
fragment_ids: Vec<FragmentId>,
},
Refresh {
table_id: TableId,
associated_source_id: TableId,
},
ListFinish {
table_id: TableId,
associated_source_id: TableId,
},
LoadFinish {
table_id: TableId,
associated_source_id: TableId,
},
}Expand description
Command is the input of crate::barrier::worker::GlobalBarrierWorker. For different commands,
it will build different barriers to send,
and may do different stuffs after the barrier is collected.
Variants§
Flush
Flush command will generate a checkpoint barrier. After the barrier is collected and committed
all messages before the checkpoint barrier should have been committed.
Pause
Pause command generates a Pause barrier only if
the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
Resume
Resume command generates a Resume barrier only
if the cluster is paused with the same reason. Otherwise, a barrier with no mutation
will be generated.
DropStreamingJobs
DropStreamingJobs command generates a Stop barrier to stop the given
Vec<ActorId>. The catalog has ensured that these streaming jobs are safe to be
dropped by reference counts before.
Barriers from the actors to be dropped will STILL be collected. After the barrier is collected, it notifies the local stream manager of compute nodes to drop actors, and then delete the job fragments info from meta store.
Fields
unregistered_fragment_ids: HashSet<FragmentId>dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>CreateStreamingJob
CreateStreamingJob command generates a Add barrier by given info.
Barriers from the actors to be created, which is marked as Inactive at first, will STILL
be collected since the barrier should be passthrough.
After the barrier is collected, these newly created actors will be marked as Running. And
it adds the job fragments info to meta store. However, the creating progress will last
for a while until the finish channel is signaled, then the state of TableFragments
will be set to Created.
MergeSnapshotBackfillStreamingJobs(HashMap<JobId, (HashSet<TableId>, InflightStreamingJobInfo)>)
RescheduleFragment
Reschedule command generates a Update barrier by the Reschedule of each fragment.
Mainly used for scaling and migration.
Barriers from which actors should be collected, and the post behavior of this command are
very similar to Create and Drop commands, for added and removed actors, respectively.
Fields
reschedules: HashMap<FragmentId, Reschedule>fragment_actors: HashMap<FragmentId, HashSet<ActorId>>ReplaceStreamJob(ReplaceStreamJobPlan)
ReplaceStreamJob command generates a Update barrier with the given replace_upstream. This is
essentially switching the downstream of the old job fragments to the new ones, and
dropping the old job fragments. Used for schema change.
This can be treated as a special case of RescheduleFragment, while the upstream fragment
of the Merge executors are changed additionally.
SourceChangeSplit(SplitState)
SourceChangeSplit generates a Splits barrier for pushing initialized splits or
changed splits.
Throttle(ThrottleConfig)
Throttle command generates a Throttle barrier with the given throttle config to change
the rate_limit of FlowControl Executor after StreamScan or Source.
CreateSubscription
CreateSubscription command generates a CreateSubscriptionMutation to notify
materialize executor to start storing old value for subscription.
DropSubscription
DropSubscription command generates a DropSubscriptionsMutation to notify
materialize executor to stop storing old value when there is no
subscription depending on it.
ConnectorPropsChange(ConnectorPropsChange)
StartFragmentBackfill
StartFragmentBackfill command will trigger backfilling for specified scans by fragment_id.
Fields
fragment_ids: Vec<FragmentId>Refresh
Refresh command generates a barrier to refresh a table by truncating state
and reloading data from source.
ListFinish
LoadFinish
Implementations§
Source§impl Command
impl Command
pub fn pause() -> Self
pub fn resume() -> Self
pub(crate) fn fragment_changes( &self, ) -> Option<(Option<JobId>, HashMap<FragmentId, CommandFragmentChanges>)>
pub fn need_checkpoint(&self) -> bool
Source§impl Command
impl Command
Sourcepub(super) fn to_mutation(
&self,
is_currently_paused: bool,
edges: &mut Option<FragmentEdgeBuildResult>,
control_stream_manager: &ControlStreamManager,
) -> Option<Mutation>
pub(super) fn to_mutation( &self, is_currently_paused: bool, edges: &mut Option<FragmentEdgeBuildResult>, control_stream_manager: &ControlStreamManager, ) -> Option<Mutation>
Generate a mutation for the given command.
edges contains the information of dispatchers of DispatchExecutor and actor_upstreamss of MergeNode
pub(super) fn actors_to_create( &self, graph_info: &InflightDatabaseInfo, edges: &mut Option<FragmentEdgeBuildResult>, control_stream_manager: &ControlStreamManager, ) -> Option<StreamJobActorsToCreate>
fn generate_update_mutation_for_replace_table( dropped_actors: impl IntoIterator<Item = ActorId>, merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>, dispatchers: FragmentActorDispatchers, init_split_assignment: &SplitAssignment, cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration, auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>, ) -> Option<Mutation>
Sourcepub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_
pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_
For CancelStreamingJob, returns the table id of the target table.
Source§impl Command
impl Command
pub(super) fn collect_actor_upstreams( actor_dispatchers: impl Iterator<Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>)>, reschedule_dispatcher_update: Option<(&HashMap<FragmentId, Reschedule>, &HashMap<FragmentId, HashSet<ActorId>>)>, graph_info: &InflightDatabaseInfo, control_stream_manager: &ControlStreamManager, ) -> HashMap<ActorId, ActorUpstreams>
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Command
impl RefUnwindSafe for Command
impl Send for Command
impl Sync for Command
impl Unpin for Command
impl UnwindSafe for Command
Blanket Implementations§
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level].§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n].§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n] with metric_level set to
MetricLevel::Debug and relabel_num set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.§impl<T> ToRootSpan for Twhere
T: Display,
impl<T> ToRootSpan for Twhere
T: Display,
§fn to_root_span(&self) -> Span
fn to_root_span(&self) -> Span
Span] that can be used as the root of an await-tree.