pub struct StreamJobFragments {
stream_job_id: TableId,
state: State,
pub fragments: BTreeMap<FragmentId, Fragment>,
pub actor_status: BTreeMap<ActorId, ActorStatus>,
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
pub ctx: StreamContext,
pub assigned_parallelism: TableParallelism,
pub max_parallelism: usize,
}
Expand description
Fragments of a streaming job. Corresponds to PbTableFragments
.
(It was previously called TableFragments
due to historical reasons.)
We store whole fragments in a single column family as follow:
stream_job_id
=> StreamJobFragments
.
Fields§
§stream_job_id: TableId
The table id.
state: State
The state of the table fragments.
fragments: BTreeMap<FragmentId, Fragment>
The table fragments.
actor_status: BTreeMap<ActorId, ActorStatus>
The status of actors
actor_splits: HashMap<ActorId, Vec<SplitImpl>>
The splits of actors,
incl. both Source
and SourceBackfill
actors.
ctx: StreamContext
The streaming context associated with this stream plan and its fragments
assigned_parallelism: TableParallelism
The parallelism assigned to this table fragments
max_parallelism: usize
The max parallelism specified when the streaming job was created, i.e., expected vnode count.
The reason for persisting this value is mainly to check if a parallelism change (via ALTER .. SET PARALLELISM
) is valid, so that the behavior can be consistent with the creation of
the streaming job.
Note that the actual vnode count, denoted by vnode_count
in fragments
, may be different
from this value (see StreamFragmentGraph.max_parallelism
for more details.). As a result,
checking the parallelism change with this value can be inaccurate in some cases. However,
when generating resizing plans, we still take the vnode_count
of each fragment into account.
Implementations§
source§impl StreamJobFragments
impl StreamJobFragments
pub fn to_protobuf(&self) -> PbTableFragments
pub fn from_protobuf(prost: PbTableFragments) -> Self
source§impl StreamJobFragments
impl StreamJobFragments
sourcepub fn for_test(
table_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
) -> Self
pub fn for_test( table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>, ) -> Self
Create a new TableFragments
with state of Initial
, with other fields empty.
sourcepub fn new(
stream_job_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
ctx: StreamContext,
table_parallelism: TableParallelism,
max_parallelism: usize,
) -> Self
pub fn new( stream_job_id: TableId, fragments: BTreeMap<FragmentId, Fragment>, actor_locations: &BTreeMap<ActorId, WorkerSlotId>, ctx: StreamContext, table_parallelism: TableParallelism, max_parallelism: usize, ) -> Self
Create a new TableFragments
with state of Initial
, with the status of actors set to
Inactive
on the given workers.
pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_
pub fn fragments(&self) -> impl Iterator<Item = &Fragment>
sourcepub fn stream_job_id(&self) -> TableId
pub fn stream_job_id(&self) -> TableId
Returns the table id.
sourcepub fn is_created(&self) -> bool
pub fn is_created(&self) -> bool
Returns whether the table fragments is in Created
state.
sourcepub fn is_initial(&self) -> bool
pub fn is_initial(&self) -> bool
Returns whether the table fragments is in Initial
state.
sourcepub fn update_actors_state(&mut self, state: ActorState)
pub fn update_actors_state(&mut self, state: ActorState)
Update state of all actors
pub fn set_actor_splits_by_split_assignment( &mut self, split_assignment: SplitAssignment, )
pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId>
sourcepub fn actors(&self) -> Vec<StreamActor>
pub fn actors(&self) -> Vec<StreamActor>
Returns actors associated with this table.
sourcepub fn filter_actor_ids(
&self,
check_type: impl Fn(u32) -> bool + 'static,
) -> impl Iterator<Item = ActorId> + '_
pub fn filter_actor_ids( &self, check_type: impl Fn(u32) -> bool + 'static, ) -> impl Iterator<Item = ActorId> + '_
Returns the actor ids with the given fragment type.
sourcepub fn mview_actor_ids(&self) -> Vec<ActorId>
pub fn mview_actor_ids(&self) -> Vec<ActorId>
Returns mview actor ids.
sourcepub fn tracking_progress_actor_ids(
&self,
) -> Vec<(ActorId, BackfillUpstreamType)>
pub fn tracking_progress_actor_ids( &self, ) -> Vec<(ActorId, BackfillUpstreamType)>
Returns actor ids that need to be tracked when creating MV.
sourcepub fn mview_fragment(&self) -> Option<Fragment>
pub fn mview_fragment(&self) -> Option<Fragment>
Returns the fragment with the Mview
type flag.
pub fn source_fragment(&self) -> Option<Fragment>
pub fn sink_fragment(&self) -> Option<Fragment>
pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId>
sourcepub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>>
pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>>
Extract the fragments that include source executors that contains an external stream source, grouping by source id.
pub fn source_backfill_fragments( &self, ) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>>
sourcefn resolve_dependent_table(
stream_node: &StreamNode,
table_ids: &mut HashMap<TableId, usize>,
)
fn resolve_dependent_table( stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>, )
Resolve dependent table
sourcepub fn dependent_table_ids(&self) -> HashMap<TableId, usize>
pub fn dependent_table_ids(&self) -> HashMap<TableId, usize>
Returns a mapping of dependent table ids of the TableFragments
to their corresponding count.
sourcepub fn worker_actor_states(
&self,
) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>>
pub fn worker_actor_states( &self, ) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>>
Returns states of actors group by worker id.
sourcepub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>>
pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>>
Returns actor locations group by worker id.
sourcepub fn active_actors(&self) -> Vec<StreamActor>
pub fn active_actors(&self) -> Vec<StreamActor>
Returns the status of actors group by worker id.
pub fn actors_to_create(&self) -> HashMap<WorkerId, Vec<StreamActor>>
pub fn mv_table_id(&self) -> Option<u32>
sourcepub fn internal_tables(&self) -> BTreeMap<u32, Table>
pub fn internal_tables(&self) -> BTreeMap<u32, Table>
Retrieve the complete internal tables map of the whole graph.
Compared to crate::stream::StreamFragmentGraph::incomplete_internal_tables
,
the table catalogs returned here are complete, with all fields filled.
sourcepub fn all_tables(&self) -> BTreeMap<u32, Table>
pub fn all_tables(&self) -> BTreeMap<u32, Table>
internal_tables()
with additional table in Materialize
node.
fn collect_tables_inner( &self, internal_tables_only: bool, ) -> BTreeMap<u32, Table>
sourcepub fn internal_table_ids(&self) -> Vec<u32>
pub fn internal_table_ids(&self) -> Vec<u32>
Returns the internal table ids without the mview table.
sourcepub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_
pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_
Returns all internal table ids including the mview table.
sourcepub fn fill_expr_context(self) -> Self
pub fn fill_expr_context(self) -> Self
Fill the expr_context
in StreamActor
. Used for compatibility.
Trait Implementations§
source§impl Clone for StreamJobFragments
impl Clone for StreamJobFragments
source§fn clone(&self) -> StreamJobFragments
fn clone(&self) -> StreamJobFragments
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreAuto Trait Implementations§
impl Freeze for StreamJobFragments
impl RefUnwindSafe for StreamJobFragments
impl Send for StreamJobFragments
impl Sync for StreamJobFragments
impl Unpin for StreamJobFragments
impl UnwindSafe for StreamJobFragments
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.