pub struct JoinHashMap<K: HashKey, S: StateStore> {
inner: ManagedLruCache<K, HashValueWrapper, PrecomputedBuildHasher, SharedStatsAlloc<Global>>,
join_key_data_types: Vec<DataType>,
null_matched: K::Bitmap,
pk_serializer: OrderedRowSerde,
state: TableInner<S>,
degree_state: Option<TableInner<S>>,
need_degree_table: bool,
pk_contained_in_jk: bool,
inequality_key_desc: Option<InequalityKeyDesc>,
metrics: JoinHashMapMetrics,
}
Fields§
§inner: ManagedLruCache<K, HashValueWrapper, PrecomputedBuildHasher, SharedStatsAlloc<Global>>
Store the join states.
join_key_data_types: Vec<DataType>
Data types of the join key columns
null_matched: K::Bitmap
Null safe bitmap for each join pair
pk_serializer: OrderedRowSerde
The memcomparable serializer of primary key.
state: TableInner<S>
State table. Contains the data from upstream.
degree_state: Option<TableInner<S>>
Degree table.
The degree is generated from the hash join executor.
Each row in state
has a corresponding degree in degree state
.
A degree value d
in for a row means the row has d
matched row in the other join side.
It will only be used when needed in a side.
- Full Outer: both side
- Left Outer/Semi/Anti: left side
- Right Outer/Semi/Anti: right side
- Inner: neither side.
Should be set to None
if need_degree_table
was set to false
.
The degree of each row will tell us if we need to emit NULL
for the row.
For instance, given lhs LEFT JOIN rhs
,
If the degree of a row in lhs
is 0, it means the row does not have a match in rhs
.
If the degree of a row in lhs
is 2, it means the row has two matches in rhs
.
Now, when emitting the result of the join, we need to emit NULL
for the row in lhs
if
the degree is 0.
Why don’t just use a boolean value instead of a degree count?
Consider the case where we delete a matched record from rhs
.
Since we can delete a record,
there must have been a record in rhs
that matched the record in lhs
.
So this value is true
.
But we don’t know how many records are matched after removing this record,
since we only stored a boolean value rather than the count.
Hence we need to store the count of matched records.
need_degree_table: bool
If degree table is need
pk_contained_in_jk: bool
Pk is part of the join key.
inequality_key_desc: Option<InequalityKeyDesc>
Inequality key description for AsOf
join.
metrics: JoinHashMapMetrics
Metrics of the hash map
Implementations§
source§impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
pub(crate) fn get_degree_state_mut_ref( &mut self, ) -> (&[usize], &mut Option<TableInner<S>>)
sourcepub(crate) async fn fetch_matched_rows_and_get_degree_table_ref<'a>(
&'a mut self,
key: &'a K,
) -> StreamExecutorResult<(impl Stream<Item = StreamExecutorResult<(Vec<u8>, JoinRow<OwnedRow>)>> + 'a, &'a [usize], &'a mut Option<TableInner<S>>)>
pub(crate) async fn fetch_matched_rows_and_get_degree_table_ref<'a>( &'a mut self, key: &'a K, ) -> StreamExecutorResult<(impl Stream<Item = StreamExecutorResult<(Vec<u8>, JoinRow<OwnedRow>)>> + 'a, &'a [usize], &'a mut Option<TableInner<S>>)>
NOTE(kwannoel): This allows us to concurrently stream records from the state_table
,
and update the degree table, without using unsafe
code.
This is because we obtain separate references to separate parts of the JoinHashMap
,
instead of reusing the same reference to JoinHashMap
for concurrent read access to state_table
,
and write access to the degree table.
source§impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
sourcepub fn new(
watermark_sequence: AtomicU64Ref,
join_key_data_types: Vec<DataType>,
state_join_key_indices: Vec<usize>,
state_all_data_types: Vec<DataType>,
state_table: StateTable<S>,
state_pk_indices: Vec<usize>,
degree_state: Option<TableInner<S>>,
null_matched: K::Bitmap,
pk_contained_in_jk: bool,
inequality_key_idx: Option<usize>,
metrics: Arc<StreamingMetrics>,
actor_id: ActorId,
fragment_id: FragmentId,
side: &'static str,
) -> Self
pub fn new( watermark_sequence: AtomicU64Ref, join_key_data_types: Vec<DataType>, state_join_key_indices: Vec<usize>, state_all_data_types: Vec<DataType>, state_table: StateTable<S>, state_pk_indices: Vec<usize>, degree_state: Option<TableInner<S>>, null_matched: K::Bitmap, pk_contained_in_jk: bool, inequality_key_idx: Option<usize>, metrics: Arc<StreamingMetrics>, actor_id: ActorId, fragment_id: FragmentId, side: &'static str, ) -> Self
Create a JoinHashMap
with the given LRU capacity.
pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>
source§impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
pub fn update_watermark(&mut self, watermark: ScalarImpl)
sourcepub fn take_state_opt(&mut self, key: &K) -> CacheResult
pub fn take_state_opt(&mut self, key: &K) -> CacheResult
Take the state for the given key
out of the hash table and return it. One MUST call
update_state
after some operations to put the state back.
If the state does not exist in the cache, fetch the remote storage and return. If it still
does not exist in the remote storage, a JoinEntryState
with empty cache will be
returned.
Note: This will NOT remove anything from remote storage.
sourcepub async fn take_state<'a>(
&mut self,
key: &K,
) -> StreamExecutorResult<Box<JoinEntryState>>
pub async fn take_state<'a>( &mut self, key: &K, ) -> StreamExecutorResult<Box<JoinEntryState>>
Take the state for the given key
out of the hash table and return it. One MUST call
update_state
after some operations to put the state back.
If the state does not exist in the cache, fetch the remote storage and return. If it still
does not exist in the remote storage, a JoinEntryState
with empty cache will be
returned.
Note: This will NOT remove anything from remote storage.
sourceasync fn fetch_cached_state(
&self,
key: &K,
) -> StreamExecutorResult<JoinEntryState>
async fn fetch_cached_state( &self, key: &K, ) -> StreamExecutorResult<JoinEntryState>
Fetch cache from the state store. Should only be called if the key does not exist in memory.
Will return a empty JoinEntryState
even when state does not exist in remote.
pub async fn flush( &mut self, epoch: EpochPair, ) -> StreamExecutorResult<JoinHashMapPostCommit<'_, K, S>>
pub async fn try_flush(&mut self) -> StreamExecutorResult<()>
pub fn insert_handle_degree( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
sourcepub fn insert(
&mut self,
key: &K,
value: JoinRow<impl Row>,
) -> StreamExecutorResult<()>
pub fn insert( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Insert a join row
sourcepub fn insert_row(
&mut self,
key: &K,
value: impl Row,
) -> StreamExecutorResult<()>
pub fn insert_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>
Insert a row. Used when the side does not need to update degree.
pub fn delete_handle_degree( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
sourcepub fn delete(
&mut self,
key: &K,
value: JoinRow<impl Row>,
) -> StreamExecutorResult<()>
pub fn delete( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Delete a join row
sourcepub fn delete_row(
&mut self,
key: &K,
value: impl Row,
) -> StreamExecutorResult<()>
pub fn delete_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>
Delete a row Used when the side does not need to update degree.
sourcepub fn update_state(&mut self, key: &K, state: Box<JoinEntryState>)
pub fn update_state(&mut self, key: &K, state: Box<JoinEntryState>)
Update a JoinEntryState
into the hash table.
sourcepub fn entry_count(&self) -> usize
pub fn entry_count(&self) -> usize
Cached entry count for this hash table.
pub fn null_matched(&self) -> &K::Bitmap
pub fn table_id(&self) -> u32
pub fn join_key_data_types(&self) -> &[DataType]
sourcepub fn check_inequal_key_null(&self, row: &impl Row) -> bool
pub fn check_inequal_key_null(&self, row: &impl Row) -> bool
pub fn serialize_pk_from_row(&self, row: impl Row) -> Vec<u8> ⓘ
Auto Trait Implementations§
impl<K, S> !Freeze for JoinHashMap<K, S>
impl<K, S> !RefUnwindSafe for JoinHashMap<K, S>
impl<K, S> Send for JoinHashMap<K, S>
impl<K, S> Sync for JoinHashMap<K, S>
impl<K, S> Unpin for JoinHashMap<K, S>
impl<K, S> !UnwindSafe for JoinHashMap<K, S>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
, which can then be
downcast
into Box<dyn ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
, which can then be further
downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.