pub struct JoinHashMap<K: HashKey, S: StateStore> {
inner: ManagedLruCache<K, HashValueWrapper, PrecomputedBuildHasher, SharedStatsAlloc<Global>>,
join_key_data_types: Vec<DataType>,
null_matched: K::Bitmap,
pk_serializer: OrderedRowSerde,
state: TableInner<S>,
degree_state: Option<TableInner<S>>,
need_degree_table: bool,
pk_contained_in_jk: bool,
inequality_key_desc: Option<InequalityKeyDesc>,
metrics: JoinHashMapMetrics,
}
Fields§
§inner: ManagedLruCache<K, HashValueWrapper, PrecomputedBuildHasher, SharedStatsAlloc<Global>>
Store the join states.
join_key_data_types: Vec<DataType>
Data types of the join key columns
null_matched: K::Bitmap
Null safe bitmap for each join pair
pk_serializer: OrderedRowSerde
The memcomparable serializer of primary key.
state: TableInner<S>
State table. Contains the data from upstream.
degree_state: Option<TableInner<S>>
Degree table.
The degree is generated from the hash join executor.
Each row in state
has a corresponding degree in degree state
.
A degree value d
in for a row means the row has d
matched row in the other join side.
It will only be used when needed in a side.
- Full Outer: both side
- Left Outer/Semi/Anti: left side
- Right Outer/Semi/Anti: right side
- Inner: neither side.
Should be set to None
if need_degree_table
was set to false
.
need_degree_table: bool
If degree table is need
pk_contained_in_jk: bool
Pk is part of the join key.
inequality_key_desc: Option<InequalityKeyDesc>
Inequality key description for AsOf
join.
metrics: JoinHashMapMetrics
Metrics of the hash map
Implementations§
source§impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
impl<K: HashKey, S: StateStore> JoinHashMap<K, S>
sourcepub fn new(
watermark_sequence: AtomicU64Ref,
join_key_data_types: Vec<DataType>,
state_join_key_indices: Vec<usize>,
state_all_data_types: Vec<DataType>,
state_table: StateTable<S>,
state_pk_indices: Vec<usize>,
degree_state: Option<TableInner<S>>,
null_matched: K::Bitmap,
pk_contained_in_jk: bool,
inequality_key_idx: Option<usize>,
metrics: Arc<StreamingMetrics>,
actor_id: ActorId,
fragment_id: FragmentId,
side: &'static str,
) -> Self
pub fn new( watermark_sequence: AtomicU64Ref, join_key_data_types: Vec<DataType>, state_join_key_indices: Vec<usize>, state_all_data_types: Vec<DataType>, state_table: StateTable<S>, state_pk_indices: Vec<usize>, degree_state: Option<TableInner<S>>, null_matched: K::Bitmap, pk_contained_in_jk: bool, inequality_key_idx: Option<usize>, metrics: Arc<StreamingMetrics>, actor_id: ActorId, fragment_id: FragmentId, side: &'static str, ) -> Self
Create a JoinHashMap
with the given LRU capacity.
pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>
sourcepub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> bool
pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> bool
Update the vnode bitmap and manipulate the cache if necessary.
pub fn update_watermark(&mut self, watermark: ScalarImpl)
sourcepub async fn take_state<'a>(
&mut self,
key: &K,
) -> StreamExecutorResult<Box<JoinEntryState>>
pub async fn take_state<'a>( &mut self, key: &K, ) -> StreamExecutorResult<Box<JoinEntryState>>
Take the state for the given key
out of the hash table and return it. One MUST call
update_state
after some operations to put the state back.
If the state does not exist in the cache, fetch the remote storage and return. If it still
does not exist in the remote storage, a JoinEntryState
with empty cache will be
returned.
Note: This will NOT remove anything from remote storage.
sourceasync fn fetch_cached_state(
&self,
key: &K,
) -> StreamExecutorResult<JoinEntryState>
async fn fetch_cached_state( &self, key: &K, ) -> StreamExecutorResult<JoinEntryState>
Fetch cache from the state store. Should only be called if the key does not exist in memory.
Will return a empty JoinEntryState
even when state does not exist in remote.
pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>
pub async fn try_flush(&mut self) -> StreamExecutorResult<()>
sourcepub fn insert(
&mut self,
key: &K,
value: JoinRow<impl Row>,
) -> StreamExecutorResult<()>
pub fn insert( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Insert a join row
sourcepub fn insert_row(
&mut self,
key: &K,
value: impl Row,
) -> StreamExecutorResult<()>
pub fn insert_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>
Insert a row. Used when the side does not need to update degree.
sourcepub fn delete(
&mut self,
key: &K,
value: JoinRow<impl Row>,
) -> StreamExecutorResult<()>
pub fn delete( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>
Delete a join row
sourcepub fn delete_row(
&mut self,
key: &K,
value: impl Row,
) -> StreamExecutorResult<()>
pub fn delete_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>
Delete a row Used when the side does not need to update degree.
sourcepub fn update_state(&mut self, key: &K, state: Box<JoinEntryState>)
pub fn update_state(&mut self, key: &K, state: Box<JoinEntryState>)
Update a JoinEntryState
into the hash table.
sourcefn manipulate_degree(
&mut self,
join_row_ref: &mut EncodedJoinRow,
join_row: &mut JoinRow<OwnedRow>,
action: impl Fn(&mut u64),
)
fn manipulate_degree( &mut self, join_row_ref: &mut EncodedJoinRow, join_row: &mut JoinRow<OwnedRow>, action: impl Fn(&mut u64), )
Manipulate the degree of the given JoinRow
and EncodedJoinRow
with action
, both in
memory and in the degree table.
sourcepub fn inc_degree(
&mut self,
join_row_ref: &mut EncodedJoinRow,
join_row: &mut JoinRow<OwnedRow>,
)
pub fn inc_degree( &mut self, join_row_ref: &mut EncodedJoinRow, join_row: &mut JoinRow<OwnedRow>, )
Increment the degree of the given JoinRow
and EncodedJoinRow
with action
, both in
memory and in the degree table.
sourcepub fn dec_degree(
&mut self,
join_row_ref: &mut EncodedJoinRow,
join_row: &mut JoinRow<OwnedRow>,
)
pub fn dec_degree( &mut self, join_row_ref: &mut EncodedJoinRow, join_row: &mut JoinRow<OwnedRow>, )
Decrement the degree of the given JoinRow
and EncodedJoinRow
with action
, both in
memory and in the degree table.
sourcepub fn entry_count(&self) -> usize
pub fn entry_count(&self) -> usize
Cached entry count for this hash table.
pub fn null_matched(&self) -> &K::Bitmap
pub fn table_id(&self) -> u32
pub fn join_key_data_types(&self) -> &[DataType]
sourcepub fn check_inequal_key_null(&self, row: &impl Row) -> bool
pub fn check_inequal_key_null(&self, row: &impl Row) -> bool
pub fn serialize_pk_from_row(&self, row: impl Row) -> Vec<u8> ⓘ
Auto Trait Implementations§
impl<K, S> !Freeze for JoinHashMap<K, S>
impl<K, S> !RefUnwindSafe for JoinHashMap<K, S>
impl<K, S> Send for JoinHashMap<K, S>
impl<K, S> Sync for JoinHashMap<K, S>
impl<K, S> Unpin for JoinHashMap<K, S>
impl<K, S> !UnwindSafe for JoinHashMap<K, S>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait>
(where Trait: Downcast
) to Box<dyn Any>
. Box<dyn Any>
can
then be further downcast
into Box<ConcreteType>
where ConcreteType
implements Trait
.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait>
(where Trait: Downcast
) to Rc<Any>
. Rc<Any>
can then be
further downcast
into Rc<ConcreteType>
where ConcreteType
implements Trait
.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &Any
’s vtable from &Trait
’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait
(where Trait: Downcast
) to &Any
. This is needed since Rust cannot
generate &mut Any
’s vtable from &mut Trait
’s.§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.