risingwave_stream/executor/join/
row.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
17use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
18use risingwave_common_estimate_size::EstimateSize;
19
20use crate::executor::StreamExecutorResult;
21
22pub trait JoinEncoding: 'static + Send + Sync + Default {
23    type EncodedRow: CachedJoinRow<DecodedRow = Self::DecodedRow> + Default;
24    type DecodedRow: Row;
25
26    fn encode<R: Row>(row: &JoinRow<R>) -> Self::EncodedRow;
27}
28
29#[derive(Default)]
30pub struct CpuEncoding {}
31
32impl JoinEncoding for CpuEncoding {
33    type DecodedRow = OwnedRow;
34    type EncodedRow = JoinRow<OwnedRow>;
35
36    fn encode<R: Row>(row: &JoinRow<R>) -> JoinRow<OwnedRow> {
37        JoinRow::new(row.row.to_owned_row(), row.degree)
38    }
39}
40
41#[derive(Default)]
42pub struct MemoryEncoding {}
43
44impl JoinEncoding for MemoryEncoding {
45    type DecodedRow = OwnedRow;
46    type EncodedRow = EncodedJoinRow;
47
48    fn encode<R: Row>(row: &JoinRow<R>) -> EncodedJoinRow {
49        EncodedJoinRow {
50            compacted_row: (&row.row).into(),
51            degree: row.degree,
52        }
53    }
54}
55
56/// This is a row with a match degree
57#[derive(Clone, Debug)]
58pub struct JoinRow<R: Row> {
59    pub row: R,
60    pub degree: DegreeType,
61}
62
63impl<R: Row> JoinRow<R> {
64    pub fn new(row: R, degree: DegreeType) -> Self {
65        Self { row, degree }
66    }
67
68    pub fn is_zero_degree(&self) -> bool {
69        self.degree == 0
70    }
71
72    /// Return row and degree in `Row` format. The degree part will be inserted in degree table
73    /// later, so a pk prefix will be added.
74    ///
75    /// * `state_order_key_indices` - the order key of `row`
76    /// * `degree_inequality_idx` - optional index of inequality column in row for degree table
77    pub fn to_table_rows<'a>(
78        &'a self,
79        state_order_key_indices: &'a [usize],
80        degree_inequality_idx: Option<usize>,
81    ) -> (&'a R, impl Row + 'a) {
82        let degree = build_degree_row(
83            state_order_key_indices,
84            self.degree,
85            degree_inequality_idx,
86            &self.row,
87        );
88        (&self.row, degree)
89    }
90
91    /// Map the row to another row.
92    pub fn map<R2: Row>(self, f: impl FnOnce(R) -> R2) -> JoinRow<R2> {
93        JoinRow::new(f(self.row), self.degree)
94    }
95}
96
97pub type DegreeType = u64;
98
99pub(crate) fn build_degree_row<R: Row>(
100    order_key_indices: &[usize],
101    degree: DegreeType,
102    inequality_idx: Option<usize>,
103    row: &R,
104) -> impl Row {
105    let order_key = row.project(order_key_indices);
106    let inequality = inequality_idx.map(|idx| row.datum_at(idx).to_owned_datum());
107    let base = order_key.chain(row::once(Some(ScalarImpl::Int64(degree as i64))));
108    if let Some(ineq) = inequality {
109        Either::Right(base.chain(row::once(ineq)))
110    } else {
111        Either::Left(base)
112    }
113}
114
115pub trait CachedJoinRow: EstimateSize + Default + Send + Sync {
116    type DecodedRow: Row;
117
118    fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<Self::DecodedRow>>;
119
120    fn increase_degree(&mut self);
121
122    fn decrease_degree(&mut self);
123}
124
125#[derive(Clone, Debug, EstimateSize, Default)]
126pub struct EncodedJoinRow {
127    pub compacted_row: CompactedRow,
128    pub degree: DegreeType,
129}
130
131impl CachedJoinRow for EncodedJoinRow {
132    type DecodedRow = OwnedRow;
133
134    fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
135        let row = self.compacted_row.deserialize(data_types)?;
136        Ok(JoinRow::new(row, self.degree))
137    }
138
139    fn increase_degree(&mut self) {
140        self.degree += 1;
141    }
142
143    fn decrease_degree(&mut self) {
144        self.degree -= 1;
145    }
146}
147
148impl Default for JoinRow<OwnedRow> {
149    fn default() -> JoinRow<OwnedRow> {
150        Self {
151            row: OwnedRow::default(),
152            degree: DegreeType::default(),
153        }
154    }
155}
156
157impl EstimateSize for JoinRow<OwnedRow> {
158    fn estimated_heap_size(&self) -> usize {
159        self.row.estimated_heap_size()
160    }
161}
162
163impl CachedJoinRow for JoinRow<OwnedRow> {
164    type DecodedRow = OwnedRow;
165
166    fn decode(&self, _data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
167        Ok(self.clone())
168    }
169
170    fn increase_degree(&mut self) {
171        self.degree += 1;
172    }
173
174    fn decrease_degree(&mut self) {
175        self.degree -= 1;
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn test_cached_join_row_sizes() {
185        let encoded_size = size_of::<EncodedJoinRow>();
186        let unencoded_size = size_of::<JoinRow<OwnedRow>>();
187
188        assert_eq!(encoded_size, 40);
189        assert_eq!(unencoded_size, 24);
190    }
191}