risingwave_stream/executor/join/
row.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
16use risingwave_common::types::{DataType, ScalarImpl};
17use risingwave_common_estimate_size::EstimateSize;
18
19use crate::executor::StreamExecutorResult;
20
21pub trait JoinEncoding: 'static + Send + Sync + Default {
22    type EncodedRow: CachedJoinRow + Default;
23    fn encode<R: Row>(row: &JoinRow<R>) -> Self::EncodedRow;
24}
25
26#[derive(Default)]
27pub struct CpuEncoding {}
28
29impl JoinEncoding for CpuEncoding {
30    type EncodedRow = JoinRow<OwnedRow>;
31
32    fn encode<R: Row>(row: &JoinRow<R>) -> JoinRow<OwnedRow> {
33        JoinRow::new(row.row.to_owned_row(), row.degree)
34    }
35}
36
37#[derive(Default)]
38pub struct MemoryEncoding {}
39
40impl JoinEncoding for MemoryEncoding {
41    type EncodedRow = EncodedJoinRow;
42
43    fn encode<R: Row>(row: &JoinRow<R>) -> EncodedJoinRow {
44        EncodedJoinRow {
45            compacted_row: (&row.row).into(),
46            degree: row.degree,
47        }
48    }
49}
50
51/// This is a row with a match degree
52#[derive(Clone, Debug)]
53pub struct JoinRow<R: Row> {
54    pub row: R,
55    pub degree: DegreeType,
56}
57
58impl<R: Row> JoinRow<R> {
59    pub fn new(row: R, degree: DegreeType) -> Self {
60        Self { row, degree }
61    }
62
63    pub fn is_zero_degree(&self) -> bool {
64        self.degree == 0
65    }
66
67    /// Return row and degree in `Row` format. The degree part will be inserted in degree table
68    /// later, so a pk prefix will be added.
69    ///
70    /// * `state_order_key_indices` - the order key of `row`
71    pub fn to_table_rows<'a>(
72        &'a self,
73        state_order_key_indices: &'a [usize],
74    ) -> (&'a R, impl Row + 'a) {
75        let order_key = (&self.row).project(state_order_key_indices);
76        let degree = build_degree_row(order_key, self.degree);
77        (&self.row, degree)
78    }
79}
80
81pub type DegreeType = u64;
82
83fn build_degree_row(order_key: impl Row, degree: DegreeType) -> impl Row {
84    order_key.chain(row::once(Some(ScalarImpl::Int64(degree as i64))))
85}
86
87pub trait CachedJoinRow: EstimateSize + Default + Send + Sync {
88    fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>>;
89
90    fn increase_degree(&mut self);
91
92    fn decrease_degree(&mut self);
93}
94
95#[derive(Clone, Debug, EstimateSize, Default)]
96pub struct EncodedJoinRow {
97    pub compacted_row: CompactedRow,
98    pub degree: DegreeType,
99}
100
101impl CachedJoinRow for EncodedJoinRow {
102    fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
103        let row = self.compacted_row.deserialize(data_types)?;
104        Ok(JoinRow::new(row, self.degree))
105    }
106
107    fn increase_degree(&mut self) {
108        self.degree += 1;
109    }
110
111    fn decrease_degree(&mut self) {
112        self.degree -= 1;
113    }
114}
115
116impl Default for JoinRow<OwnedRow> {
117    fn default() -> JoinRow<OwnedRow> {
118        Self {
119            row: OwnedRow::default(),
120            degree: DegreeType::default(),
121        }
122    }
123}
124
125impl EstimateSize for JoinRow<OwnedRow> {
126    fn estimated_heap_size(&self) -> usize {
127        self.row.estimated_heap_size()
128    }
129}
130
131impl CachedJoinRow for JoinRow<OwnedRow> {
132    fn decode(&self, _data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
133        Ok(self.clone())
134    }
135
136    fn increase_degree(&mut self) {
137        self.degree += 1;
138    }
139
140    fn decrease_degree(&mut self) {
141        self.degree -= 1;
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_cached_join_row_sizes() {
151        let encoded_size = size_of::<EncodedJoinRow>();
152        let unencoded_size = size_of::<JoinRow<OwnedRow>>();
153
154        assert_eq!(encoded_size, 40);
155        assert_eq!(unencoded_size, 24);
156    }
157}