pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: u8> {Show 16 fields
ctx: ActorContextRef,
info: ExecutorInfo,
input_l: Option<Executor>,
input_r: Option<Executor>,
actual_output_data_types: Vec<DataType>,
side_l: JoinSide<K, S>,
side_r: JoinSide<K, S>,
cond: Option<NonStrictExpression>,
inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
inequality_watermarks: Vec<Option<Watermark>>,
append_only_optimize: bool,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
cnt_rows_received: u32,
watermark_buffers: BTreeMap<usize, BufferedWatermarks<u8>>,
high_join_amplification_threshold: usize,
}
Expand description
HashJoinExecutor
takes two input streams and runs equal hash join on them.
The output columns are the concatenation of left and right columns.
Fields§
§ctx: ActorContextRef
§info: ExecutorInfo
§input_l: Option<Executor>
Left input executor
input_r: Option<Executor>
Right input executor
actual_output_data_types: Vec<DataType>
The data types of the formed new columns
side_l: JoinSide<K, S>
The parameters of the left join executor
side_r: JoinSide<K, S>
The parameters of the right join executor
cond: Option<NonStrictExpression>
Optional non-equi join conditions
inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>
Column indices of watermark output and offset expression of each inequality, respectively.
inequality_watermarks: Vec<Option<Watermark>>
The output watermark of each inequality condition and its value is the minimum of the
calculation result of both side. It will be used to generate watermark into downstream
and do state cleaning if clean_state
field of that inequality is true
.
append_only_optimize: bool
Whether the logic can be optimized for append-only stream
metrics: Arc<StreamingMetrics>
§chunk_size: usize
The maximum size of the chunk produced by executor at a time
cnt_rows_received: u32
Count the messages received, clear to 0 when counted to EVICT_EVERY_N_MESSAGES
watermark_buffers: BTreeMap<usize, BufferedWatermarks<u8>>
watermark column index -> BufferedWatermarks
high_join_amplification_threshold: usize
Implementations§
source§impl<K: HashKey, S: StateStore, const T: u8> HashJoinExecutor<K, S, T>
impl<K: HashKey, S: StateStore, const T: u8> HashJoinExecutor<K, S, T>
pub fn new( ctx: ActorContextRef, info: ExecutorInfo, input_l: Executor, input_r: Executor, params_l: JoinParams, params_r: JoinParams, null_safe: Vec<bool>, output_indices: Vec<usize>, cond: Option<NonStrictExpression>, inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>, state_table_l: StateTable<S>, degree_state_table_l: StateTable<S>, state_table_r: StateTable<S>, degree_state_table_r: StateTable<S>, watermark_epoch: AtomicU64Ref, is_append_only: bool, metrics: Arc<StreamingMetrics>, chunk_size: usize, high_join_amplification_threshold: usize, ) -> Self
fn into_stream(self) -> impl Stream<Item = Result<Message, StreamExecutorError>>
async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>
async fn try_flush_data(&mut self) -> StreamExecutorResult<()>
fn evict_cache( side_update: &mut JoinSide<K, S>, side_match: &mut JoinSide<K, S>, cnt_rows_received: &mut u32, )
async fn handle_watermark( &mut self, side: u8, watermark: Watermark, ) -> StreamExecutorResult<Vec<Watermark>>
sourceasync fn hash_eq_match(
key: &K,
ht: &mut JoinHashMap<K, S>,
) -> StreamExecutorResult<Option<Box<JoinEntryState>>>
async fn hash_eq_match( key: &K, ht: &mut JoinHashMap<K, S>, ) -> StreamExecutorResult<Option<Box<JoinEntryState>>>
the data the hash table and match the coming data chunk with the executor state
fn row_concat( row_update: &RowRef<'_>, update_start_pos: usize, row_matched: &OwnedRow, matched_start_pos: usize, ) -> OwnedRow
sourcefn eq_join_left(
args: EqJoinArgs<'_, K, S>,
) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
fn eq_join_left( args: EqJoinArgs<'_, K, S>, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
Used to forward eq_join_oneside
to show join side in stack.
sourcefn eq_join_right(
args: EqJoinArgs<'_, K, S>,
) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
fn eq_join_right( args: EqJoinArgs<'_, K, S>, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
Used to forward eq_join_oneside
to show join side in stack.
fn eq_join_oneside<'_async0, const SIDE: u8>( args: EqJoinArgs<'_async0, K, S>, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_async0
Trait Implementations§
source§impl<K: HashKey, S: StateStore, const T: u8> Debug for HashJoinExecutor<K, S, T>
impl<K: HashKey, S: StateStore, const T: u8> Debug for HashJoinExecutor<K, S, T>
source§impl<K: HashKey, S: StateStore, const T: u8> Execute for HashJoinExecutor<K, S, T>
impl<K: HashKey, S: StateStore, const T: u8> Execute for HashJoinExecutor<K, S, T>
Auto Trait Implementations§
impl<K, S, const T: u8> !Freeze for HashJoinExecutor<K, S, T>
impl<K, S, const T: u8> !RefUnwindSafe for HashJoinExecutor<K, S, T>
impl<K, S, const T: u8> Send for HashJoinExecutor<K, S, T>
impl<K, S, const T: u8> !Sync for HashJoinExecutor<K, S, T>
impl<K, S, const T: u8> Unpin for HashJoinExecutor<K, S, T>
impl<K, S, const T: u8> !UnwindSafe for HashJoinExecutor<K, S, T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.