pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {Show 15 fields
input: Executor,
schema: Schema,
state_table: StateTableInner<S, SD>,
arrange_key_indices: Vec<usize>,
actor_context: ActorContextRef,
materialize_cache: MaterializeCache<SD>,
conflict_behavior: ConflictBehavior,
version_column_indices: Vec<u32>,
may_have_downstream: bool,
depended_subscription_ids: HashSet<u32>,
metrics: MaterializeMetrics,
is_dummy_table: bool,
toastable_column_indices: Option<Vec<usize>>,
refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
local_barrier_manager: LocalBarrierManager,
}
Expand description
MaterializeExecutor
materializes changes in stream into a materialized view on storage.
Fields§
§input: Executor
§schema: Schema
§state_table: StateTableInner<S, SD>
§arrange_key_indices: Vec<usize>
Columns of arrange keys (including pk, group keys, join keys, etc.)
actor_context: ActorContextRef
§materialize_cache: MaterializeCache<SD>
§conflict_behavior: ConflictBehavior
§version_column_indices: Vec<u32>
§may_have_downstream: bool
§depended_subscription_ids: HashSet<u32>
§metrics: MaterializeMetrics
§is_dummy_table: bool
No data will be written to hummock table. This Materialize is just a dummy node. Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
toastable_column_indices: Option<Vec<usize>>
Indices of TOAST-able columns for PostgreSQL CDC tables. None means either non-CDC table or CDC table without TOAST-able columns.
refresh_args: Option<RefreshableMaterializeArgs<S, SD>>
Optional refresh arguments and state for refreshable materialized views
local_barrier_manager: LocalBarrierManager
Local barrier manager for reporting barrier events
Implementations§
Source§impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD>
impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD>
Sourcepub async fn new(
input: Executor,
schema: Schema,
store: S,
arrange_key: Vec<ColumnOrder>,
actor_context: ActorContextRef,
vnodes: Option<Arc<Bitmap>>,
table_catalog: &Table,
watermark_epoch: AtomicU64Ref,
conflict_behavior: ConflictBehavior,
version_column_indices: Vec<u32>,
metrics: Arc<StreamingMetrics>,
refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
local_barrier_manager: LocalBarrierManager,
) -> Self
pub async fn new( input: Executor, schema: Schema, store: S, arrange_key: Vec<ColumnOrder>, actor_context: ActorContextRef, vnodes: Option<Arc<Bitmap>>, table_catalog: &Table, watermark_epoch: AtomicU64Ref, conflict_behavior: ConflictBehavior, version_column_indices: Vec<u32>, metrics: Arc<StreamingMetrics>, refresh_args: Option<RefreshableMaterializeArgs<S, SD>>, local_barrier_manager: LocalBarrierManager, ) -> Self
Create a new MaterializeExecutor
with distribution specified with distribution_keys
and
vnodes
. For singleton distribution, distribution_keys
should be empty and vnodes
should be None
.
fn execute_inner( self, ) -> impl Stream<Item = Result<Message, StreamExecutorError>>
Sourcefn make_mergesort_stream<'a>(
main_table: &'a StateTableInner<S, SD>,
staging_table: &'a StateTableInner<S, SD>,
progress_table: &'a mut RefreshProgressTable<S>,
) -> impl Stream<Item = Result<Option<(VirtualNode, OwnedRow)>, StreamExecutorError>> + 'a
fn make_mergesort_stream<'a>( main_table: &'a StateTableInner<S, SD>, staging_table: &'a StateTableInner<S, SD>, progress_table: &'a mut RefreshProgressTable<S>, ) -> impl Stream<Item = Result<Option<(VirtualNode, OwnedRow)>, StreamExecutorError>> + 'a
Stream that yields rows to be deleted from main table.
Yields Some((vnode, row))
for rows that exist in main but not in staging.
Yields None
when finished processing all vnodes.
Sourcefn may_update_depended_subscriptions(
depended_subscriptions: &mut HashSet<u32>,
barrier: &Barrier,
mv_table_id: TableId,
)
fn may_update_depended_subscriptions( depended_subscriptions: &mut HashSet<u32>, barrier: &Barrier, mv_table_id: TableId, )
return true when changed
Sourcefn init_refresh_progress(
state_table: &StateTableInner<S, SD>,
progress_table: &mut RefreshProgressTable<S>,
_epoch: u64,
) -> StreamExecutorResult<()>
fn init_refresh_progress( state_table: &StateTableInner<S, SD>, progress_table: &mut RefreshProgressTable<S>, _epoch: u64, ) -> StreamExecutorResult<()>
Initialize refresh progress tracking for all VNodes
Source§impl<S: StateStore> MaterializeExecutor<S, BasicSerde>
impl<S: StateStore> MaterializeExecutor<S, BasicSerde>
Sourcepub async fn for_test(
input: Executor,
store: S,
table_id: TableId,
keys: Vec<ColumnOrder>,
column_ids: Vec<ColumnId>,
watermark_epoch: AtomicU64Ref,
conflict_behavior: ConflictBehavior,
) -> Self
pub async fn for_test( input: Executor, store: S, table_id: TableId, keys: Vec<ColumnOrder>, column_ids: Vec<ColumnId>, watermark_epoch: AtomicU64Ref, conflict_behavior: ConflictBehavior, ) -> Self
Create a new MaterializeExecutor
without distribution info for test purpose.
Trait Implementations§
Source§impl<S: StateStore, SD: ValueRowSerde> Debug for MaterializeExecutor<S, SD>
impl<S: StateStore, SD: ValueRowSerde> Debug for MaterializeExecutor<S, SD>
Source§impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD>
impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD>
Auto Trait Implementations§
impl<S, SD> !Freeze for MaterializeExecutor<S, SD>
impl<S, SD> !RefUnwindSafe for MaterializeExecutor<S, SD>
impl<S, SD> Send for MaterializeExecutor<S, SD>
impl<S, SD> !Sync for MaterializeExecutor<S, SD>
impl<S, SD> Unpin for MaterializeExecutor<S, SD>
impl<S, SD> !UnwindSafe for MaterializeExecutor<S, SD>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
, which can then be
downcast
into Box<dyn ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
, which can then be further
downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
].§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
].§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
] with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.