pub struct JoinHashMap<K: HashKey, S: StateStore, E: JoinEncoding> {
inner: ManagedLruCache<K, HashValueWrapper<E>, PrecomputedBuildHasher>,
join_key_data_types: Vec<DataType>,
null_matched: K::Bitmap,
pk_serializer: OrderedRowSerde,
state: TableInner<S>,
degree_state: Option<TableInner<S>>,
need_degree_table: bool,
pk_contained_in_jk: bool,
inequality_key_desc: Option<InequalityKeyDesc>,
metrics: JoinHashMapMetrics,
_marker: PhantomData<E>,
}Fields§
§inner: ManagedLruCache<K, HashValueWrapper<E>, PrecomputedBuildHasher>Store the join states.
join_key_data_types: Vec<DataType>Data types of the join key columns
null_matched: K::BitmapNull safe bitmap for each join pair
pk_serializer: OrderedRowSerdeThe memcomparable serializer of primary key.
state: TableInner<S>State table. Contains the data from upstream.
degree_state: Option<TableInner<S>>Degree table.
The degree is generated from the hash join executor.
Each row in state has a corresponding degree in degree state.
A degree value d in for a row means the row has d matched row in the other join side.
It will only be used when needed in a side.
- Full Outer: both side
- Left Outer/Semi/Anti: left side
- Right Outer/Semi/Anti: right side
- Inner: neither side.
Should be set to None if need_degree_table was set to false.
The degree of each row will tell us if we need to emit NULL for the row.
For instance, given lhs LEFT JOIN rhs,
If the degree of a row in lhs is 0, it means the row does not have a match in rhs.
If the degree of a row in lhs is 2, it means the row has two matches in rhs.
Now, when emitting the result of the join, we need to emit NULL for the row in lhs if
the degree is 0.
Why don’t just use a boolean value instead of a degree count?
Consider the case where we delete a matched record from rhs.
Since we can delete a record,
there must have been a record in rhs that matched the record in lhs.
So this value is true.
But we don’t know how many records are matched after removing this record,
since we only stored a boolean value rather than the count.
Hence we need to store the count of matched records.
need_degree_table: boolIf degree table is need
pk_contained_in_jk: boolPk is part of the join key.
inequality_key_desc: Option<InequalityKeyDesc>Inequality key description for AsOf join.
metrics: JoinHashMapMetricsMetrics of the hash map
_marker: PhantomData<E>Implementations§
Source§impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinHashMap<K, S, E>
impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinHashMap<K, S, E>
pub(crate) fn get_degree_state_mut_ref( &mut self, ) -> (&[usize], &mut Option<TableInner<S>>)
Sourcepub(crate) async fn fetch_matched_rows_and_get_degree_table_ref<'a>(
&'a mut self,
key: &'a K,
) -> StreamExecutorResult<(impl Stream<Item = StreamExecutorResult<(Vec<u8>, JoinRow<OwnedRow>)>> + 'a, &'a [usize], &'a mut Option<TableInner<S>>)>
pub(crate) async fn fetch_matched_rows_and_get_degree_table_ref<'a>( &'a mut self, key: &'a K, ) -> StreamExecutorResult<(impl Stream<Item = StreamExecutorResult<(Vec<u8>, JoinRow<OwnedRow>)>> + 'a, &'a [usize], &'a mut Option<TableInner<S>>)>
NOTE(kwannoel): This allows us to concurrently stream records from the state_table,
and update the degree table, without using unsafe code.
This is because we obtain separate references to separate parts of the JoinHashMap,
instead of reusing the same reference to JoinHashMap for concurrent read access to state_table,
and write access to the degree table.
Source§impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinHashMap<K, S, E>
impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinHashMap<K, S, E>
Sourcepub fn new(
watermark_sequence: AtomicU64Ref,
join_key_data_types: Vec<DataType>,
state_join_key_indices: Vec<usize>,
state_all_data_types: Vec<DataType>,
state_table: StateTable<S>,
state_pk_indices: Vec<usize>,
degree_state: Option<TableInner<S>>,
null_matched: K::Bitmap,
pk_contained_in_jk: bool,
inequality_key_idx: Option<usize>,
metrics: Arc<StreamingMetrics>,
actor_id: ActorId,
fragment_id: FragmentId,
side: &'static str,
) -> Self
pub fn new( watermark_sequence: AtomicU64Ref, join_key_data_types: Vec<DataType>, state_join_key_indices: Vec<usize>, state_all_data_types: Vec<DataType>, state_table: StateTable<S>, state_pk_indices: Vec<usize>, degree_state: Option<TableInner<S>>, null_matched: K::Bitmap, pk_contained_in_jk: bool, inequality_key_idx: Option<usize>, metrics: Arc<StreamingMetrics>, actor_id: ActorId, fragment_id: FragmentId, side: &'static str, ) -> Self
Create a JoinHashMap with the given LRU capacity.
pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>
Source§impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinHashMap<K, S, E>
impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinHashMap<K, S, E>
pub fn update_watermark(&mut self, watermark: ScalarImpl)
Sourcepub fn take_state_opt(&mut self, key: &K) -> CacheResult<E>
pub fn take_state_opt(&mut self, key: &K) -> CacheResult<E>
Take the state for the given key out of the hash table and return it. One MUST call
update_state after some operations to put the state back.
If the state does not exist in the cache, fetch the remote storage and return. If it still
does not exist in the remote storage, a JoinEntryState with empty cache will be
returned.
Note: This will NOT remove anything from remote storage.
Sourcepub async fn take_state(
&mut self,
key: &K,
) -> StreamExecutorResult<Box<JoinEntryState<E>>>
pub async fn take_state( &mut self, key: &K, ) -> StreamExecutorResult<Box<JoinEntryState<E>>>
Take the state for the given key out of the hash table and return it. One MUST call
update_state after some operations to put the state back.
If the state does not exist in the cache, fetch the remote storage and return. If it still
does not exist in the remote storage, a JoinEntryState with empty cache will be
returned.
Note: This will NOT remove anything from remote storage.
Sourceasync fn fetch_cached_state(
&self,
key: &K,
) -> StreamExecutorResult<JoinEntryState<E>>
async fn fetch_cached_state( &self, key: &K, ) -> StreamExecutorResult<JoinEntryState<E>>
Fetch cache from the state store. Should only be called if the key does not exist in memory.
Will return a empty JoinEntryState even when state does not exist in remote.
pub async fn flush( &mut self, epoch: EpochPair, ) -> StreamExecutorResult<JoinHashMapPostCommit<'_, K, S, E>>
pub async fn try_flush(&mut self) -> StreamExecutorResult<()>
pub fn insert_handle_degree( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Sourcepub fn insert(
&mut self,
key: &K,
value: JoinRow<impl Row>,
) -> StreamExecutorResult<()>
pub fn insert( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Insert a join row
Sourcepub fn insert_row(
&mut self,
key: &K,
value: impl Row,
) -> StreamExecutorResult<()>
pub fn insert_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>
Insert a row. Used when the side does not need to update degree.
pub fn delete_handle_degree( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Sourcepub fn delete(
&mut self,
key: &K,
value: JoinRow<impl Row>,
) -> StreamExecutorResult<()>
pub fn delete( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Delete a join row
Sourcepub fn delete_row(
&mut self,
key: &K,
value: impl Row,
) -> StreamExecutorResult<()>
pub fn delete_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>
Delete a row Used when the side does not need to update degree.
Sourcepub fn update_state(&mut self, key: &K, state: Box<JoinEntryState<E>>)
pub fn update_state(&mut self, key: &K, state: Box<JoinEntryState<E>>)
Update a JoinEntryState into the hash table.
Sourcepub fn entry_count(&self) -> usize
pub fn entry_count(&self) -> usize
Cached entry count for this hash table.
pub fn null_matched(&self) -> &K::Bitmap
pub fn table_id(&self) -> u32
pub fn join_key_data_types(&self) -> &[DataType]
Sourcepub fn check_inequal_key_null(&self, row: &impl Row) -> bool
pub fn check_inequal_key_null(&self, row: &impl Row) -> bool
pub fn serialize_pk_from_row(&self, row: impl Row) -> Vec<u8> ⓘ
Auto Trait Implementations§
impl<K, S, E> !Freeze for JoinHashMap<K, S, E>
impl<K, S, E> !RefUnwindSafe for JoinHashMap<K, S, E>
impl<K, S, E> Send for JoinHashMap<K, S, E>
impl<K, S, E> Sync for JoinHashMap<K, S, E>
impl<K, S, E> Unpin for JoinHashMap<K, S, E>
impl<K, S, E> !UnwindSafe for JoinHashMap<K, S, E>
Blanket Implementations§
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level].§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n].§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n] with metric_level set to
MetricLevel::Debug and relabel_num set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.