risingwave_stream::executor::join::hash_join

Struct JoinHashMap

source
pub struct JoinHashMap<K: HashKey, S: StateStore> {
    inner: ManagedLruCache<K, HashValueWrapper, PrecomputedBuildHasher, SharedStatsAlloc<Global>>,
    join_key_data_types: Vec<DataType>,
    null_matched: K::Bitmap,
    pk_serializer: OrderedRowSerde,
    state: TableInner<S>,
    degree_state: Option<TableInner<S>>,
    need_degree_table: bool,
    pk_contained_in_jk: bool,
    inequality_key_desc: Option<InequalityKeyDesc>,
    metrics: JoinHashMapMetrics,
}

Fields§

§inner: ManagedLruCache<K, HashValueWrapper, PrecomputedBuildHasher, SharedStatsAlloc<Global>>

Store the join states.

§join_key_data_types: Vec<DataType>

Data types of the join key columns

§null_matched: K::Bitmap

Null safe bitmap for each join pair

§pk_serializer: OrderedRowSerde

The memcomparable serializer of primary key.

§state: TableInner<S>

State table. Contains the data from upstream.

§degree_state: Option<TableInner<S>>

Degree table.

The degree is generated from the hash join executor. Each row in state has a corresponding degree in degree state. A degree value d in for a row means the row has d matched row in the other join side.

It will only be used when needed in a side.

  • Full Outer: both side
  • Left Outer/Semi/Anti: left side
  • Right Outer/Semi/Anti: right side
  • Inner: neither side.

Should be set to None if need_degree_table was set to false.

The degree of each row will tell us if we need to emit NULL for the row. For instance, given lhs LEFT JOIN rhs, If the degree of a row in lhs is 0, it means the row does not have a match in rhs. If the degree of a row in lhs is 2, it means the row has two matches in rhs. Now, when emitting the result of the join, we need to emit NULL for the row in lhs if the degree is 0.

Why don’t just use a boolean value instead of a degree count? Consider the case where we delete a matched record from rhs. Since we can delete a record, there must have been a record in rhs that matched the record in lhs. So this value is true. But we don’t know how many records are matched after removing this record, since we only stored a boolean value rather than the count. Hence we need to store the count of matched records.

§need_degree_table: bool

If degree table is need

§pk_contained_in_jk: bool

Pk is part of the join key.

§inequality_key_desc: Option<InequalityKeyDesc>

Inequality key description for AsOf join.

§metrics: JoinHashMapMetrics

Metrics of the hash map

Implementations§

source§

impl<K: HashKey, S: StateStore> JoinHashMap<K, S>

source

pub(crate) fn get_degree_state_mut_ref( &mut self, ) -> (&[usize], &mut Option<TableInner<S>>)

source

pub(crate) async fn fetch_matched_rows_and_get_degree_table_ref<'a>( &'a mut self, key: &'a K, ) -> StreamExecutorResult<(impl Stream<Item = StreamExecutorResult<(Vec<u8>, JoinRow<OwnedRow>)>> + 'a, &'a [usize], &'a mut Option<TableInner<S>>)>

NOTE(kwannoel): This allows us to concurrently stream records from the state_table, and update the degree table, without using unsafe code.

This is because we obtain separate references to separate parts of the JoinHashMap, instead of reusing the same reference to JoinHashMap for concurrent read access to state_table, and write access to the degree table.

source§

impl<K: HashKey, S: StateStore> JoinHashMap<K, S>

source

pub fn new( watermark_sequence: AtomicU64Ref, join_key_data_types: Vec<DataType>, state_join_key_indices: Vec<usize>, state_all_data_types: Vec<DataType>, state_table: StateTable<S>, state_pk_indices: Vec<usize>, degree_state: Option<TableInner<S>>, null_matched: K::Bitmap, pk_contained_in_jk: bool, inequality_key_idx: Option<usize>, metrics: Arc<StreamingMetrics>, actor_id: ActorId, fragment_id: FragmentId, side: &'static str, ) -> Self

Create a JoinHashMap with the given LRU capacity.

source

pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>

source§

impl<K: HashKey, S: StateStore> JoinHashMap<K, S>

source

pub fn update_watermark(&mut self, watermark: ScalarImpl)

source

pub fn take_state_opt(&mut self, key: &K) -> CacheResult

Take the state for the given key out of the hash table and return it. One MUST call update_state after some operations to put the state back.

If the state does not exist in the cache, fetch the remote storage and return. If it still does not exist in the remote storage, a JoinEntryState with empty cache will be returned.

Note: This will NOT remove anything from remote storage.

source

pub async fn take_state<'a>( &mut self, key: &K, ) -> StreamExecutorResult<Box<JoinEntryState>>

Take the state for the given key out of the hash table and return it. One MUST call update_state after some operations to put the state back.

If the state does not exist in the cache, fetch the remote storage and return. If it still does not exist in the remote storage, a JoinEntryState with empty cache will be returned.

Note: This will NOT remove anything from remote storage.

source

async fn fetch_cached_state( &self, key: &K, ) -> StreamExecutorResult<JoinEntryState>

Fetch cache from the state store. Should only be called if the key does not exist in memory. Will return a empty JoinEntryState even when state does not exist in remote.

source

pub async fn flush( &mut self, epoch: EpochPair, ) -> StreamExecutorResult<JoinHashMapPostCommit<'_, K, S>>

source

pub async fn try_flush(&mut self) -> StreamExecutorResult<()>

source

pub fn insert_handle_degree( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>

source

pub fn insert( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>

Insert a join row

source

pub fn insert_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>

Insert a row. Used when the side does not need to update degree.

source

pub fn delete_handle_degree( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>

source

pub fn delete( &mut self, key: &K, value: JoinRow<impl Row>, ) -> StreamExecutorResult<()>

Delete a join row

source

pub fn delete_row( &mut self, key: &K, value: impl Row, ) -> StreamExecutorResult<()>

Delete a row Used when the side does not need to update degree.

source

pub fn update_state(&mut self, key: &K, state: Box<JoinEntryState>)

Update a JoinEntryState into the hash table.

source

pub fn evict(&mut self)

Evict the cache.

source

pub fn entry_count(&self) -> usize

Cached entry count for this hash table.

source

pub fn null_matched(&self) -> &K::Bitmap

source

pub fn table_id(&self) -> u32

source

pub fn join_key_data_types(&self) -> &[DataType]

source

pub fn check_inequal_key_null(&self, row: &impl Row) -> bool

Return true if the inequality key is null.

§Panics

Panics if the inequality key is not set.

source

pub fn serialize_inequal_key_from_row(&self, row: impl Row) -> Vec<u8>

Serialize the inequality key from a row.

§Panics

Panics if the inequality key is not set.

source

pub fn serialize_pk_from_row(&self, row: impl Row) -> Vec<u8>

Auto Trait Implementations§

§

impl<K, S> !Freeze for JoinHashMap<K, S>

§

impl<K, S> !RefUnwindSafe for JoinHashMap<K, S>

§

impl<K, S> Send for JoinHashMap<K, S>

§

impl<K, S> Sync for JoinHashMap<K, S>

§

impl<K, S> Unpin for JoinHashMap<K, S>
where <K as HashKey>::Bitmap: Unpin, <S as StateStore>::Local: Unpin, S: Unpin,

§

impl<K, S> !UnwindSafe for JoinHashMap<K, S>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Conv for T

§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
§

impl<Choices> CoproductSubsetter<CNil, HNil> for Choices

§

type Remainder = Choices

§

fn subset( self, ) -> Result<CNil, <Choices as CoproductSubsetter<CNil, HNil>>::Remainder>

Extract a subset of the possible types in a coproduct (or get the remaining possibilities) Read more
§

impl<T> Downcast for T
where T: Any,

§

fn into_any(self: Box<T>) -> Box<dyn Any>

Converts Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.
§

fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>

Converts Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further downcast into Rc<ConcreteType> where ConcreteType implements Trait.
§

fn as_any(&self) -> &(dyn Any + 'static)

Converts &Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &Any’s vtable from &Trait’s.
§

fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)

Converts &mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot generate &mut Any’s vtable from &mut Trait’s.
§

impl<T> DowncastSend for T
where T: Any + Send,

§

fn into_any_send(self: Box<T>) -> Box<dyn Any + Send>

Converts Box<Trait> (where Trait: DowncastSend) to Box<dyn Any + Send>, which can then be downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

impl<T> DowncastSync for T
where T: Any + Send + Sync,

§

fn into_any_sync(self: Box<T>) -> Box<dyn Any + Sync + Send>

Converts Box<Trait> (where Trait: DowncastSync) to Box<dyn Any + Send + Sync>, which can then be downcast into Box<ConcreteType> where ConcreteType implements Trait.
§

fn into_any_arc(self: Arc<T>) -> Arc<dyn Any + Sync + Send>

Converts Arc<Trait> (where Trait: DowncastSync) to Arc<Any>, which can then be downcast into Arc<ConcreteType> where ConcreteType implements Trait.
§

impl<T> FmtForward for T

§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
§

impl<T> IntoResult<T> for T

§

type Err = Infallible

§

fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>

§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
§

impl<T, U, I> LiftInto<U, I> for T
where U: LiftFrom<T, I>,

§

fn lift_into(self) -> U

Performs the indexed conversion.
source§

impl<M> MetricVecRelabelExt for M

source§

fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>

source§

fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>

source§

fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>

Equivalent to RelabeledMetricVec::with_metric_level_relabel_n with metric_level set to MetricLevel::Debug and relabel_num set to 1.
§

impl<T> Pipe for T
where T: ?Sized,

§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> Same for T

source§

type Output = T

Should always be Self
§

impl<T> Scope for T

§

fn with<F, R>(self, f: F) -> R
where Self: Sized, F: FnOnce(Self) -> R,

Scoped with ownership.
§

fn with_ref<F, R>(&self, f: F) -> R
where F: FnOnce(&Self) -> R,

Scoped with reference.
§

fn with_mut<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut Self) -> R,

Scoped with mutable reference.
§

impl<Source> Sculptor<HNil, HNil> for Source

§

type Remainder = Source

§

fn sculpt(self) -> (HNil, <Source as Sculptor<HNil, HNil>>::Remainder)

Consumes the current HList and returns an HList with the requested shape. Read more
source§

impl<T> SerTo<T> for T

§

impl<T> Tap for T

§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
§

impl<T> TryConv for T

§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> LruValue for T
where T: Send + Sync,

§

impl<T> MaybeSend for T
where T: Send,

§

impl<T> MaybeSend for T
where T: Send,

§

impl<T> Value for T
where T: Send + Sync + 'static,