risingwave_stream::executor::join::hash_join

Function fetch_degrees

source
async fn fetch_degrees<K: HashKey, S: StateStore>(
    key: &K,
    join_key_data_types: &[DataType],
    degree_state_table: &StateTable<S>,
) -> StreamExecutorResult<Vec<u64>>
Expand description

We use this to fetch ALL degrees into memory. We use this instead of a streaming interface. It is necessary because we must update the degree_state_table concurrently. If we obtain the degrees in a stream, we will need to hold an immutable reference to the state table for the entire lifetime, preventing us from concurrently updating the state table.

The cost of fetching all degrees upfront is acceptable. We currently already do so in fetch_cached_state. The memory use should be limited since we only store a u64.

Let’s say we have amplification of 1B, we will have 1B * 8 bytes ~= 8GB

We can also have further optimization, to permit breaking the streaming update, to flush the in-memory degrees, if this is proven to have high memory consumption.

TODO(kwannoel): Perhaps we can cache these separately from matched rows too. Because matched rows may occupy a larger capacity.

Argument for this: We only hit this when cache miss. When cache miss, we will have this as one off cost. Keeping this cached separately from matched rows is beneficial. Then we can evict matched rows, without touching the degrees.