pub struct AggGroup<S: StateStore, Strtg: Strategy> {
group_key: Option<GroupKey>,
states: Vec<AggState>,
prev_outputs: Option<OwnedRow>,
row_count_index: usize,
_phantom: PhantomData<(S, Strtg)>,
}
Expand description
AggGroup
manages agg states of all agg calls for one group_key
.
Fields§
§group_key: Option<GroupKey>
Group key.
states: Vec<AggState>
Current managed states for all AggCall
s.
prev_outputs: Option<OwnedRow>
Previous outputs of aggregate functions. Initializing with None
.
row_count_index: usize
Index of row count agg call (count(*)
) in the call list.
_phantom: PhantomData<(S, Strtg)>
Implementations§
source§impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg>
impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg>
sourcepub async fn create(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
storages: &[AggStateStorage<S>],
intermediate_state_table: &StateTable<S>,
pk_indices: &PkIndices,
row_count_index: usize,
extreme_cache_size: usize,
input_schema: &Schema,
) -> StreamExecutorResult<Self>
pub async fn create( version: PbAggNodeVersion, group_key: Option<GroupKey>, agg_calls: &[AggCall], agg_funcs: &[BoxedAggregateFunction], storages: &[AggStateStorage<S>], intermediate_state_table: &StateTable<S>, pk_indices: &PkIndices, row_count_index: usize, extreme_cache_size: usize, input_schema: &Schema, ) -> StreamExecutorResult<Self>
Create AggGroup
for the given AggCall
s and group_key
.
For crate::executor::SimpleAggExecutor
, the group_key
should be None
.
sourcepub fn create_eowc(
version: PbAggNodeVersion,
group_key: Option<GroupKey>,
agg_calls: &[AggCall],
agg_funcs: &[BoxedAggregateFunction],
storages: &[AggStateStorage<S>],
encoded_states: &OwnedRow,
pk_indices: &PkIndices,
row_count_index: usize,
extreme_cache_size: usize,
input_schema: &Schema,
) -> StreamExecutorResult<Self>
pub fn create_eowc( version: PbAggNodeVersion, group_key: Option<GroupKey>, agg_calls: &[AggCall], agg_funcs: &[BoxedAggregateFunction], storages: &[AggStateStorage<S>], encoded_states: &OwnedRow, pk_indices: &PkIndices, row_count_index: usize, extreme_cache_size: usize, input_schema: &Schema, ) -> StreamExecutorResult<Self>
Create a group from encoded states for EOWC. The previous output is set to None
.
pub fn group_key(&self) -> Option<&GroupKey>
pub fn group_key_row(&self) -> OwnedRow
fn prev_row_count(&self) -> usize
sourcefn curr_row_count(&self) -> usize
fn curr_row_count(&self) -> usize
Get current row count of this group.
pub(crate) fn is_uninitialized(&self) -> bool
sourcepub async fn apply_chunk(
&mut self,
chunk: &StreamChunk,
calls: &[AggCall],
funcs: &[BoxedAggregateFunction],
visibilities: Vec<Bitmap>,
) -> StreamExecutorResult<()>
pub async fn apply_chunk( &mut self, chunk: &StreamChunk, calls: &[AggCall], funcs: &[BoxedAggregateFunction], visibilities: Vec<Bitmap>, ) -> StreamExecutorResult<()>
Apply input chunk to all managed agg states.
mappings
contains the column mappings from input chunk to each agg call.
visibilities
contains the row visibility of the input chunk for each agg call.
sourcefn reset(
&mut self,
funcs: &[BoxedAggregateFunction],
) -> StreamExecutorResult<()>
fn reset( &mut self, funcs: &[BoxedAggregateFunction], ) -> StreamExecutorResult<()>
Reset all in-memory states to their initial state, i.e. to reset all agg state structs to the status as if they are just created, no input applied and no row in state table.
sourcepub fn encode_states(
&self,
funcs: &[BoxedAggregateFunction],
) -> StreamExecutorResult<OwnedRow>
pub fn encode_states( &self, funcs: &[BoxedAggregateFunction], ) -> StreamExecutorResult<OwnedRow>
Encode intermediate states.
sourceasync fn get_outputs(
&mut self,
storages: &[AggStateStorage<S>],
funcs: &[BoxedAggregateFunction],
) -> StreamExecutorResult<(usize, OwnedRow, AggStateCacheStats)>
async fn get_outputs( &mut self, storages: &[AggStateStorage<S>], funcs: &[BoxedAggregateFunction], ) -> StreamExecutorResult<(usize, OwnedRow, AggStateCacheStats)>
Get the outputs of all managed agg states, without group key prefix. Possibly need to read/sync from state table if the state not cached in memory. This method is idempotent, i.e. it can be called multiple times and the outputs are guaranteed to be the same.
sourcepub async fn build_change(
&mut self,
storages: &[AggStateStorage<S>],
funcs: &[BoxedAggregateFunction],
) -> StreamExecutorResult<(Option<Record<OwnedRow>>, AggStateCacheStats)>
pub async fn build_change( &mut self, storages: &[AggStateStorage<S>], funcs: &[BoxedAggregateFunction], ) -> StreamExecutorResult<(Option<Record<OwnedRow>>, AggStateCacheStats)>
Build aggregation result change, according to previous and current agg outputs. The saved previous outputs will be updated to the latest outputs after this method.
Trait Implementations§
source§impl<S: StateStore, Strtg: Strategy> EstimateSize for AggGroup<S, Strtg>
impl<S: StateStore, Strtg: Strategy> EstimateSize for AggGroup<S, Strtg>
source§fn estimated_heap_size(&self) -> usize
fn estimated_heap_size(&self) -> usize
source§fn estimated_size(&self) -> usizewhere
Self: Sized,
fn estimated_size(&self) -> usizewhere
Self: Sized,
estimated_heap_size
and the size of Self
.source§impl<S: StateStore> EstimateSize for Box<AggGroup<S, OnlyOutputIfHasInput>>
impl<S: StateStore> EstimateSize for Box<AggGroup<S, OnlyOutputIfHasInput>>
source§fn estimated_heap_size(&self) -> usize
fn estimated_heap_size(&self) -> usize
source§fn estimated_size(&self) -> usizewhere
Self: Sized,
fn estimated_size(&self) -> usizewhere
Self: Sized,
estimated_heap_size
and the size of Self
.Auto Trait Implementations§
impl<S, Strtg> Freeze for AggGroup<S, Strtg>
impl<S, Strtg> !RefUnwindSafe for AggGroup<S, Strtg>
impl<S, Strtg> Send for AggGroup<S, Strtg>where
Strtg: Send,
impl<S, Strtg> Sync for AggGroup<S, Strtg>where
Strtg: Sync,
impl<S, Strtg> Unpin for AggGroup<S, Strtg>
impl<S, Strtg> !UnwindSafe for AggGroup<S, Strtg>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.