pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> {Show 17 fields
ctx: ActorContextRef,
info: ExecutorInfo,
input_l: Option<Executor>,
input_r: Option<Executor>,
actual_output_data_types: Vec<DataType>,
side_l: JoinSide<K, S, E>,
side_r: JoinSide<K, S, E>,
cond: Option<NonStrictExpression>,
inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
inequality_watermarks: Vec<Option<Watermark>>,
append_only_optimize: bool,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
cnt_rows_received: u32,
watermark_buffers: BTreeMap<usize, BufferedWatermarks<u8>>,
high_join_amplification_threshold: usize,
entry_state_max_rows: usize,
}Expand description
HashJoinExecutor takes two input streams and runs equal hash join on them.
The output columns are the concatenation of left and right columns.
Fields§
§ctx: ActorContextRef§info: ExecutorInfo§input_l: Option<Executor>Left input executor
input_r: Option<Executor>Right input executor
actual_output_data_types: Vec<DataType>The data types of the formed new columns
side_l: JoinSide<K, S, E>The parameters of the left join executor
side_r: JoinSide<K, S, E>The parameters of the right join executor
cond: Option<NonStrictExpression>Optional non-equi join conditions
inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>Column indices of watermark output and offset expression of each inequality, respectively.
inequality_watermarks: Vec<Option<Watermark>>The output watermark of each inequality condition and its value is the minimum of the
calculation result of both side. It will be used to generate watermark into downstream
and do state cleaning if clean_state field of that inequality is true.
append_only_optimize: boolWhether the logic can be optimized for append-only stream
metrics: Arc<StreamingMetrics>§chunk_size: usizeThe maximum size of the chunk produced by executor at a time
cnt_rows_received: u32Count the messages received, clear to 0 when counted to EVICT_EVERY_N_MESSAGES
watermark_buffers: BTreeMap<usize, BufferedWatermarks<u8>>watermark column index -> BufferedWatermarks
high_join_amplification_threshold: usizeWhen to alert high join amplification
entry_state_max_rows: usizeMax number of rows that will be cached in the entry state.
Implementations§
Source§impl<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> HashJoinExecutor<K, S, T, E>
impl<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> HashJoinExecutor<K, S, T, E>
pub fn new( ctx: ActorContextRef, info: ExecutorInfo, input_l: Executor, input_r: Executor, params_l: JoinParams, params_r: JoinParams, null_safe: Vec<bool>, output_indices: Vec<usize>, cond: Option<NonStrictExpression>, inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>, state_table_l: StateTable<S>, degree_state_table_l: StateTable<S>, state_table_r: StateTable<S>, degree_state_table_r: StateTable<S>, watermark_epoch: AtomicU64Ref, is_append_only: bool, metrics: Arc<StreamingMetrics>, chunk_size: usize, high_join_amplification_threshold: usize, ) -> Self
pub fn new_with_cache_size( ctx: ActorContextRef, info: ExecutorInfo, input_l: Executor, input_r: Executor, params_l: JoinParams, params_r: JoinParams, null_safe: Vec<bool>, output_indices: Vec<usize>, cond: Option<NonStrictExpression>, inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>, state_table_l: StateTable<S>, degree_state_table_l: StateTable<S>, state_table_r: StateTable<S>, degree_state_table_r: StateTable<S>, watermark_epoch: AtomicU64Ref, is_append_only: bool, metrics: Arc<StreamingMetrics>, chunk_size: usize, high_join_amplification_threshold: usize, entry_state_max_rows: Option<usize>, ) -> Self
fn into_stream(self) -> impl Stream<Item = Result<Message, StreamExecutorError>>
async fn flush_data( &mut self, epoch: EpochPair, ) -> StreamExecutorResult<(JoinHashMapPostCommit<'_, K, S, E>, JoinHashMapPostCommit<'_, K, S, E>)>
async fn try_flush_data(&mut self) -> StreamExecutorResult<()>
fn evict_cache( side_update: &mut JoinSide<K, S, E>, side_match: &mut JoinSide<K, S, E>, cnt_rows_received: &mut u32, )
async fn handle_watermark( &mut self, side: u8, watermark: Watermark, ) -> StreamExecutorResult<Vec<Watermark>>
fn row_concat( row_update: impl Row, update_start_pos: usize, row_matched: impl Row, matched_start_pos: usize, ) -> OwnedRow
Sourcefn eq_join_left(
args: EqJoinArgs<'_, K, S, E>,
) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
fn eq_join_left( args: EqJoinArgs<'_, K, S, E>, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
Used to forward eq_join_oneside to show join side in stack.
Sourcefn eq_join_right(
args: EqJoinArgs<'_, K, S, E>,
) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
fn eq_join_right( args: EqJoinArgs<'_, K, S, E>, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_
Used to forward eq_join_oneside to show join side in stack.
fn eq_join_oneside<'_async0, const SIDE: u8>( args: EqJoinArgs<'_async0, K, S, E>, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_async0
Sourcefn handle_match_rows<'a, const SIDE: u8, const JOIN_OP: bool>(
cached_lookup_result: CacheResult<E>,
row: RowRef<'a>,
key: &'a K,
hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
side_match: &'a mut JoinSide<K, S, E>,
side_update: &'a mut JoinSide<K, S, E>,
useful_state_clean_columns: &'a [(usize, &'a Watermark)],
cond: &'a mut Option<NonStrictExpression>,
append_only_optimize: bool,
entry_state_max_rows: usize,
) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + 'a
fn handle_match_rows<'a, const SIDE: u8, const JOIN_OP: bool>( cached_lookup_result: CacheResult<E>, row: RowRef<'a>, key: &'a K, hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>, side_match: &'a mut JoinSide<K, S, E>, side_update: &'a mut JoinSide<K, S, E>, useful_state_clean_columns: &'a [(usize, &'a Watermark)], cond: &'a mut Option<NonStrictExpression>, append_only_optimize: bool, entry_state_max_rows: usize, ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + 'a
For the probe-side row, we need to check if it has values in cache, if not, we need to fetch the matched rows from the state table.
Every matched build-side row being processed needs to go through the following phases:
- Handle join condition evaluation.
- Always do cache refill, if the state count is good.
- Handle state cleaning.
- Handle degree table update.
async fn handle_match_row<'a, R: Row, RO: Row, const SIDE: u8, const JOIN_OP: bool, const MATCHED_ROWS_FROM_CACHE: bool>( update_row: RowRef<'a>, matched_row: JoinRow<R>, matched_row_cache_ref: Option<&mut E::EncodedRow>, hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>, match_order_key_indices: &[usize], match_degree_table: &mut Option<TableInner<S>>, side_update_start_pos: usize, side_match_start_pos: usize, cond: &Option<NonStrictExpression>, update_row_degree: &mut u64, useful_state_clean_columns: &[(usize, &'a Watermark)], append_only_optimize: bool, append_only_matched_row: &mut Option<JoinRow<RO>>, matched_rows_to_clean: &mut Vec<JoinRow<RO>>, map_output: impl Fn(R) -> RO, ) -> Option<StreamChunk>
async fn check_join_condition( row: impl Row, side_update_start_pos: usize, matched_row: impl Row, side_match_start_pos: usize, join_condition: &Option<NonStrictExpression>, ) -> bool
Trait Implementations§
Source§impl<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> Debug for HashJoinExecutor<K, S, T, E>
impl<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> Debug for HashJoinExecutor<K, S, T, E>
Source§impl<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> Execute for HashJoinExecutor<K, S, T, E>
impl<K: HashKey, S: StateStore, const T: u8, E: JoinEncoding> Execute for HashJoinExecutor<K, S, T, E>
Auto Trait Implementations§
impl<K, S, const T: u8, E> !Freeze for HashJoinExecutor<K, S, T, E>
impl<K, S, const T: u8, E> !RefUnwindSafe for HashJoinExecutor<K, S, T, E>
impl<K, S, const T: u8, E> Send for HashJoinExecutor<K, S, T, E>
impl<K, S, const T: u8, E> !Sync for HashJoinExecutor<K, S, T, E>
impl<K, S, const T: u8, E> Unpin for HashJoinExecutor<K, S, T, E>
impl<K, S, const T: u8, E> !UnwindSafe for HashJoinExecutor<K, S, T, E>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
Source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
Source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level.Source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n.Source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n with metric_level set to
MetricLevel::Debug and relabel_num set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.