struct ActorGraphBuildStateInner {
actor_builders: BTreeMap<GlobalId<{ IdCategory::Actor }>, ActorBuilder>,
building_locations: BTreeMap<GlobalId<{ IdCategory::Actor }>, WorkerSlotId>,
external_changes: BTreeMap<GlobalId<{ IdCategory::Actor }>, ExternalChange>,
external_locations: BTreeMap<GlobalId<{ IdCategory::Actor }>, WorkerSlotId>,
}
Expand description
The actual mutable state of building an actor graph.
When the fragments are visited in a topological order, actor builders will be added to this state and the scheduled locations will be added. As the building process is run on the complete graph which also contains the info of the existing (external) fragments, the info of them will be also recorded.
Fields§
§actor_builders: BTreeMap<GlobalId<{ IdCategory::Actor }>, ActorBuilder>
The builders of the actors to be built.
building_locations: BTreeMap<GlobalId<{ IdCategory::Actor }>, WorkerSlotId>
The scheduled locations of the actors to be built.
external_changes: BTreeMap<GlobalId<{ IdCategory::Actor }>, ExternalChange>
The required changes to the external actors. See ExternalChange
.
external_locations: BTreeMap<GlobalId<{ IdCategory::Actor }>, WorkerSlotId>
The actual locations of the external actors.
Implementations§
source§impl ActorGraphBuildStateInner
impl ActorGraphBuildStateInner
sourcefn add_actor(
&mut self,
actor_id: GlobalId<{ IdCategory::Actor }>,
fragment_id: GlobalId<{ IdCategory::Fragment }>,
worker_slot_id: WorkerSlotId,
vnode_bitmap: Option<Bitmap>,
node: Arc<StreamNode>,
)
fn add_actor( &mut self, actor_id: GlobalId<{ IdCategory::Actor }>, fragment_id: GlobalId<{ IdCategory::Fragment }>, worker_slot_id: WorkerSlotId, vnode_bitmap: Option<Bitmap>, node: Arc<StreamNode>, )
Insert new generated actor and record its location.
The vnode_bitmap
should be Some
for the actors of hash-distributed fragments.
sourcefn record_external_location(
&mut self,
actor_id: GlobalId<{ IdCategory::Actor }>,
worker_slot_id: WorkerSlotId,
)
fn record_external_location( &mut self, actor_id: GlobalId<{ IdCategory::Actor }>, worker_slot_id: WorkerSlotId, )
Record the location of an external actor.
sourcefn new_hash_dispatcher(
strategy: &DispatchStrategy,
downstream_fragment_id: GlobalId<{ IdCategory::Fragment }>,
downstream_actors: &[GlobalId<{ IdCategory::Actor }>],
downstream_actor_mapping: ActorMapping,
) -> Dispatcher
fn new_hash_dispatcher( strategy: &DispatchStrategy, downstream_fragment_id: GlobalId<{ IdCategory::Fragment }>, downstream_actors: &[GlobalId<{ IdCategory::Actor }>], downstream_actor_mapping: ActorMapping, ) -> Dispatcher
Create a new hash dispatcher.
sourcefn new_normal_dispatcher(
strategy: &DispatchStrategy,
downstream_fragment_id: GlobalId<{ IdCategory::Fragment }>,
downstream_actors: &[GlobalId<{ IdCategory::Actor }>],
) -> Dispatcher
fn new_normal_dispatcher( strategy: &DispatchStrategy, downstream_fragment_id: GlobalId<{ IdCategory::Fragment }>, downstream_actors: &[GlobalId<{ IdCategory::Actor }>], ) -> Dispatcher
Create a new dispatcher for non-hash types.
sourcefn add_dispatcher(
&mut self,
actor_id: GlobalId<{ IdCategory::Actor }>,
dispatcher: Dispatcher,
)
fn add_dispatcher( &mut self, actor_id: GlobalId<{ IdCategory::Actor }>, dispatcher: Dispatcher, )
Add the new dispatcher for an actor.
- If the actor is to be built, the dispatcher will be added to the actor builder.
- If the actor is an external actor, the dispatcher will be added to the external changes.
sourcefn add_upstream(
&mut self,
actor_id: GlobalId<{ IdCategory::Actor }>,
upstream: ActorUpstream,
)
fn add_upstream( &mut self, actor_id: GlobalId<{ IdCategory::Actor }>, upstream: ActorUpstream, )
Add the new upstream for an actor.
- If the actor is to be built, the upstream will be added to the actor builder.
- If the actor is an external actor, the upstream will be added to the external changes.
sourcefn get_location(
&self,
actor_id: GlobalId<{ IdCategory::Actor }>,
) -> WorkerSlotId
fn get_location( &self, actor_id: GlobalId<{ IdCategory::Actor }>, ) -> WorkerSlotId
Get the location of an actor. Will look up the location map of both the actors to be built and the external actors.
sourcefn add_link<'a>(
&mut self,
upstream: FragmentLinkNode<'a>,
downstream: FragmentLinkNode<'a>,
edge: &'a StreamFragmentEdge,
)
fn add_link<'a>( &mut self, upstream: FragmentLinkNode<'a>, downstream: FragmentLinkNode<'a>, edge: &'a StreamFragmentEdge, )
Add a “link” between two fragments in the graph.
The edge
will be expanded into multiple (downstream - upstream) pairs for the actors in
the two fragments, based on the distribution and the dispatch strategy. They will be
finally transformed to Dispatcher
and Merge
nodes when building the actors.
If there’re existing (external) fragments, the info will be recorded in external_changes
,
instead of the actor builders.
Trait Implementations§
source§impl Default for ActorGraphBuildStateInner
impl Default for ActorGraphBuildStateInner
source§fn default() -> ActorGraphBuildStateInner
fn default() -> ActorGraphBuildStateInner
Auto Trait Implementations§
impl Freeze for ActorGraphBuildStateInner
impl RefUnwindSafe for ActorGraphBuildStateInner
impl Send for ActorGraphBuildStateInner
impl Sync for ActorGraphBuildStateInner
impl Unpin for ActorGraphBuildStateInner
impl UnwindSafe for ActorGraphBuildStateInner
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.