pub struct StreamJobFragments {
pub stream_job_id: TableId,
pub state: State,
pub fragments: BTreeMap<FragmentId, Fragment>,
pub actor_status: BTreeMap<ActorId, ActorStatus>,
pub ctx: StreamContext,
pub assigned_parallelism: TableParallelism,
pub max_parallelism: usize,
}Expand description
Fragments of a streaming job. Corresponds to [PbTableFragments].
(It was previously called TableFragments due to historical reasons.)
We store whole fragments in a single column family as follow:
stream_job_id => StreamJobFragments.
Fields§
§stream_job_id: TableIdThe table id.
state: StateThe state of the table fragments.
fragments: BTreeMap<FragmentId, Fragment>The table fragments.
actor_status: BTreeMap<ActorId, ActorStatus>The status of actors
ctx: StreamContextThe streaming context associated with this stream plan and its fragments
assigned_parallelism: TableParallelismThe parallelism assigned to this table fragments
max_parallelism: usizeThe max parallelism specified when the streaming job was created, i.e., expected vnode count.
The reason for persisting this value is mainly to check if a parallelism change (via ALTER .. SET PARALLELISM) is valid, so that the behavior can be consistent with the creation of
the streaming job.
Note that the actual vnode count, denoted by vnode_count in fragments, may be different
from this value (see StreamFragmentGraph.max_parallelism for more details.). As a result,
checking the parallelism change with this value can be inaccurate in some cases. However,
when generating resizing plans, we still take the vnode_count of each fragment into account.
Implementations§
Source§impl StreamJobFragments
impl StreamJobFragments
pub(super) fn new_fragment_info<'a>( &'a self, assignment: &'a SplitAssignment, ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a
Source§impl StreamJobFragments
impl StreamJobFragments
pub fn to_protobuf( &self, fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>, fragment_dispatchers: &FragmentActorDispatchers, ) -> PbTableFragments
Source§impl StreamJobFragments
impl StreamJobFragments
Sourcepub fn for_test(
table_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
) -> Self
pub fn for_test( table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>, ) -> Self
Create a new TableFragments with state of Initial, with other fields empty.
Sourcepub fn new(
stream_job_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
ctx: StreamContext,
table_parallelism: TableParallelism,
max_parallelism: usize,
) -> Self
pub fn new( stream_job_id: TableId, fragments: BTreeMap<FragmentId, Fragment>, actor_locations: &BTreeMap<ActorId, ActorAlignmentId>, ctx: StreamContext, table_parallelism: TableParallelism, max_parallelism: usize, ) -> Self
Create a new TableFragments with state of Initial, with the status of actors set to
Inactive on the given workers.
pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_
pub fn fragments(&self) -> impl Iterator<Item = &Fragment>
pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor]
Sourcepub fn stream_job_id(&self) -> TableId
pub fn stream_job_id(&self) -> TableId
Returns the table id.
Sourcepub fn is_created(&self) -> bool
pub fn is_created(&self) -> bool
Returns whether the table fragments is in Created state.
Sourcepub fn update_actors_state(&mut self, state: ActorState)
pub fn update_actors_state(&mut self, state: ActorState)
Update state of all actors
pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId>
pub fn tracking_progress_actor_ids( &self, ) -> Vec<(ActorId, BackfillUpstreamType)>
Sourcepub fn tracking_progress_actor_ids_impl(
fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
) -> Vec<(ActorId, BackfillUpstreamType)>
pub fn tracking_progress_actor_ids_impl( fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>, ) -> Vec<(ActorId, BackfillUpstreamType)>
Returns actor ids that need to be tracked when creating MV.
pub fn root_fragment(&self) -> Option<Fragment>
Sourcepub fn mview_fragment(&self) -> Option<Fragment>
pub fn mview_fragment(&self) -> Option<Fragment>
Returns the fragment with the Mview type flag.
pub fn source_fragment(&self) -> Option<Fragment>
pub fn sink_fragment(&self) -> Option<Fragment>
Sourcepub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>>
pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>>
Extract the fragments that include source executors that contains an external stream source, grouping by source id.
pub fn source_backfill_fragments( &self, ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>
Sourcepub fn source_backfill_fragments_impl(
fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>
pub fn source_backfill_fragments_impl( fragments: impl Iterator<Item = (FragmentId, &StreamNode)>, ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>
Returns (source_id, -> (source_backfill_fragment_id, upstream_source_fragment_id)).
Note: the fragment source_backfill_fragment_id may actually have multiple upstream fragments,
but only one of them is the upstream source fragment, which is what we return.
Sourcepub fn union_fragment_for_table(&mut self) -> &mut Fragment
pub fn union_fragment_for_table(&mut self) -> &mut Fragment
Find the table job’s Union fragment.
Panics if not found.
Sourcefn resolve_dependent_table(
stream_node: &StreamNode,
table_ids: &mut HashMap<TableId, usize>,
)
fn resolve_dependent_table( stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>, )
Resolve dependent table
pub fn upstream_table_counts(&self) -> HashMap<TableId, usize>
Sourcepub fn upstream_table_counts_impl(
fragment_nodes: impl Iterator<Item = &StreamNode>,
) -> HashMap<TableId, usize>
pub fn upstream_table_counts_impl( fragment_nodes: impl Iterator<Item = &StreamNode>, ) -> HashMap<TableId, usize>
Returns upstream table counts.
Sourcepub fn worker_actor_states(
&self,
) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>>
pub fn worker_actor_states( &self, ) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>>
Returns states of actors group by worker id.
Sourcepub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>>
pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>>
Returns actor locations group by worker id.
Sourcepub fn active_actors(&self) -> Vec<StreamActor>
pub fn active_actors(&self) -> Vec<StreamActor>
Returns the status of actors group by worker id.
pub fn actors_to_create( &self, ) -> impl Iterator<Item = (FragmentId, &StreamNode, impl Iterator<Item = (&StreamActor, WorkerId)> + '_)> + '_
pub fn mv_table_id(&self) -> Option<u32>
pub fn collect_tables( fragments: impl Iterator<Item = &Fragment>, ) -> BTreeMap<u32, Table>
Sourcepub fn internal_table_ids(&self) -> Vec<u32>
pub fn internal_table_ids(&self) -> Vec<u32>
Returns the internal table ids without the mview table.
Sourcepub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_
pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_
Returns all internal table ids including the mview table.
Sourcepub fn fill_expr_context(self) -> Self
pub fn fill_expr_context(self) -> Self
Fill the expr_context in StreamActor. Used for compatibility.
Trait Implementations§
Source§impl Clone for StreamJobFragments
impl Clone for StreamJobFragments
Source§fn clone(&self) -> StreamJobFragments
fn clone(&self) -> StreamJobFragments
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for StreamJobFragments
impl RefUnwindSafe for StreamJobFragments
impl Send for StreamJobFragments
impl Sync for StreamJobFragments
impl Unpin for StreamJobFragments
impl UnwindSafe for StreamJobFragments
Blanket Implementations§
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level].§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n].§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n] with metric_level set to
MetricLevel::Debug and relabel_num set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.