risingwave_stream::executor

Type Alias Barrier

source
pub type Barrier = BarrierInner<BarrierMutationType>;

Aliased Type§

struct Barrier {
    pub epoch: EpochPair,
    pub mutation: Option<Arc<Mutation>>,
    pub kind: BarrierKind,
    pub tracing_context: TracingContext,
    pub passed_actors: Vec<u32>,
}

Fields§

§epoch: EpochPair§mutation: Option<Arc<Mutation>>§kind: BarrierKind§tracing_context: TracingContext

Tracing context for the current epoch of this barrier.

§passed_actors: Vec<u32>

The actors that this barrier has passed locally. Used for debugging only.

Implementations§

source§

impl Barrier

source

pub fn into_dispatcher(self) -> DispatcherBarrier

source

pub fn with_mutation(self, mutation: Mutation) -> Self

source

pub fn with_stop(self) -> Self

source

pub fn is_with_stop_mutation(&self) -> bool

Whether this barrier carries stop mutation.

source

pub fn is_stop(&self, actor_id: ActorId) -> bool

Whether this barrier is to stop the actor with actor_id.

source

pub fn initial_split_assignment( &self, actor_id: ActorId, ) -> Option<&[SplitImpl]>

Get the initial split assignments for the actor with actor_id.

This should only be called on the initial barrier received by the executor. It must be

  • Add mutation when it’s a new streaming job, or recovery.
  • Update mutation when it’s created for scaling.
  • AddAndUpdate mutation when it’s created for sink-into-table.

Note that SourceChangeSplit is not included, because it’s only used for changing splits of existing executors.

source

pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>>

Get all actors that to be stopped (dropped) by this barrier.

source

pub fn is_newly_added(&self, actor_id: ActorId) -> bool

Whether this barrier is to newly add the actor with actor_id. This is used for Chain and Values to decide whether to output the existing (historical) data.

By “newly”, we mean the actor belongs to a subgraph of a new streaming job. That is, actors added for scaling are not included.

source

pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool

Whether this barrier adds new downstream fragment for the actor with upstream_actor_id.

§Use case

Some optimizations are applied when an actor doesn’t have any downstreams (“standalone” actors).

  • Pause a standalone shared SourceExecutor.
  • Disable a standalone MaterializeExecutor’s conflict check.

This is implemented by checking actor_context.initial_dispatch_num on startup, and check has_more_downstream_fragments on barrier to see whether the optimization needs to be turned off.

§Some special cases not included

Note that this is not has_new_downstream_actor/fragment. For our use case, we only care about number of downstream fragments (more precisely, existence).

  • When scaling, the number of downstream actors is changed, and they are “new”, but downstream fragments is not changed.
  • When ALTER TABLE sink_into_table, the fragment is replaced with a “new” one, but the number is not changed.
source

pub fn is_pause_on_startup(&self) -> bool

Whether this barrier requires the executor to pause its data stream on startup.

source

pub fn is_resume(&self) -> bool

Whether this barrier is for resume.

source

pub fn as_update_merge( &self, actor_id: ActorId, upstream_fragment_id: FragmentId, ) -> Option<&MergeUpdate>

Returns the MergeUpdate if this barrier is to update the merge executors for the actor with actor_id.

source

pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>>

Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor with actor_id.

Actually, this vnode bitmap update is only useful for the record accessing validation for distributed executors, since the read/write pattern will never be across multiple vnodes.

source

pub fn get_curr_epoch(&self) -> Epoch

source

pub fn tracing_context(&self) -> &TracingContext

Retrieve the tracing context for the current epoch of this barrier.

source

pub fn added_subscriber_on_mv_table( &self, mv_table_id: TableId, ) -> impl Iterator<Item = u32> + '_

source§

impl Barrier

Trait Implementations§

source§

impl<'a> TryFrom<&'a MessageInner<Option<Arc<Mutation>>>> for &'a Barrier

source§

type Error = ()

The type returned in the event of a conversion error.
source§

fn try_from(m: &'a Message) -> Result<Self, Self::Error>

Performs the conversion.