pub type Barrier = BarrierInner<BarrierMutationType>;
Aliased Type§
struct Barrier {
pub epoch: EpochPair,
pub mutation: Option<Arc<Mutation>>,
pub kind: BarrierKind,
pub tracing_context: TracingContext,
pub passed_actors: Vec<u32>,
}
Fields§
§epoch: EpochPair
§mutation: Option<Arc<Mutation>>
§kind: BarrierKind
§tracing_context: TracingContext
Tracing context for the current epoch of this barrier.
passed_actors: Vec<u32>
The actors that this barrier has passed locally. Used for debugging only.
Implementations§
source§impl Barrier
impl Barrier
pub fn into_dispatcher(self) -> DispatcherBarrier
pub fn with_mutation(self, mutation: Mutation) -> Self
pub fn with_stop(self) -> Self
sourcepub fn is_with_stop_mutation(&self) -> bool
pub fn is_with_stop_mutation(&self) -> bool
Whether this barrier carries stop mutation.
sourcepub fn is_stop(&self, actor_id: ActorId) -> bool
pub fn is_stop(&self, actor_id: ActorId) -> bool
Whether this barrier is to stop the actor with actor_id
.
sourcepub fn initial_split_assignment(
&self,
actor_id: ActorId,
) -> Option<&[SplitImpl]>
pub fn initial_split_assignment( &self, actor_id: ActorId, ) -> Option<&[SplitImpl]>
Get the initial split assignments for the actor with actor_id
.
This should only be called on the initial barrier received by the executor. It must be
Add
mutation when it’s a new streaming job, or recovery.Update
mutation when it’s created for scaling.AddAndUpdate
mutation when it’s created for sink-into-table.
Note that SourceChangeSplit
is not included, because it’s only used for changing splits
of existing executors.
sourcepub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>>
pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>>
Get all actors that to be stopped (dropped) by this barrier.
sourcepub fn is_newly_added(&self, actor_id: ActorId) -> bool
pub fn is_newly_added(&self, actor_id: ActorId) -> bool
Whether this barrier is to newly add the actor with actor_id
. This is used for Chain
and
Values
to decide whether to output the existing (historical) data.
By “newly”, we mean the actor belongs to a subgraph of a new streaming job. That is, actors added for scaling are not included.
sourcepub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool
pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool
Whether this barrier adds new downstream fragment for the actor with upstream_actor_id
.
§Use case
Some optimizations are applied when an actor doesn’t have any downstreams (“standalone” actors).
- Pause a standalone shared
SourceExecutor
. - Disable a standalone
MaterializeExecutor
’s conflict check.
This is implemented by checking actor_context.initial_dispatch_num
on startup, and
check has_more_downstream_fragments
on barrier to see whether the optimization
needs to be turned off.
§Some special cases not included
Note that this is not has_new_downstream_actor/fragment
. For our use case, we only
care about number of downstream fragments (more precisely, existence).
- When scaling, the number of downstream actors is changed, and they are “new”, but downstream fragments is not changed.
- When
ALTER TABLE sink_into_table
, the fragment is replaced with a “new” one, but the number is not changed.
sourcepub fn is_pause_on_startup(&self) -> bool
pub fn is_pause_on_startup(&self) -> bool
Whether this barrier requires the executor to pause its data stream on startup.
sourcepub fn as_update_merge(
&self,
actor_id: ActorId,
upstream_fragment_id: FragmentId,
) -> Option<&MergeUpdate>
pub fn as_update_merge( &self, actor_id: ActorId, upstream_fragment_id: FragmentId, ) -> Option<&MergeUpdate>
Returns the MergeUpdate
if this barrier is to update the merge executors for the actor
with actor_id
.
sourcepub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>>
pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>>
Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
with actor_id
.
Actually, this vnode bitmap update is only useful for the record accessing validation for distributed executors, since the read/write pattern will never be across multiple vnodes.
pub fn get_curr_epoch(&self) -> Epoch
sourcepub fn tracing_context(&self) -> &TracingContext
pub fn tracing_context(&self) -> &TracingContext
Retrieve the tracing context for the current epoch of this barrier.