risingwave_stream/executor/join/
row.rs1use either::Either;
16use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
17use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
18use risingwave_common_estimate_size::EstimateSize;
19
20use crate::executor::StreamExecutorResult;
21
22pub trait JoinEncoding: 'static + Send + Sync + Default {
23 type EncodedRow: CachedJoinRow<DecodedRow = Self::DecodedRow> + Default;
24 type DecodedRow: Row;
25
26 fn encode<R: Row>(row: &JoinRow<R>) -> Self::EncodedRow;
27}
28
29#[derive(Default)]
30pub struct CpuEncoding {}
31
32impl JoinEncoding for CpuEncoding {
33 type DecodedRow = OwnedRow;
34 type EncodedRow = JoinRow<OwnedRow>;
35
36 fn encode<R: Row>(row: &JoinRow<R>) -> JoinRow<OwnedRow> {
37 JoinRow::new(row.row.to_owned_row(), row.degree)
38 }
39}
40
41#[derive(Default)]
42pub struct MemoryEncoding {}
43
44impl JoinEncoding for MemoryEncoding {
45 type DecodedRow = OwnedRow;
46 type EncodedRow = EncodedJoinRow;
47
48 fn encode<R: Row>(row: &JoinRow<R>) -> EncodedJoinRow {
49 EncodedJoinRow {
50 compacted_row: (&row.row).into(),
51 degree: row.degree,
52 }
53 }
54}
55
56#[derive(Clone, Debug)]
58pub struct JoinRow<R: Row> {
59 pub row: R,
60 pub degree: DegreeType,
61}
62
63impl<R: Row> JoinRow<R> {
64 pub fn new(row: R, degree: DegreeType) -> Self {
65 Self { row, degree }
66 }
67
68 pub fn is_zero_degree(&self) -> bool {
69 self.degree == 0
70 }
71
72 pub fn to_table_rows<'a>(
78 &'a self,
79 state_order_key_indices: &'a [usize],
80 degree_inequality_idx: Option<usize>,
81 ) -> (&'a R, impl Row + 'a) {
82 let degree = build_degree_row(
83 state_order_key_indices,
84 self.degree,
85 degree_inequality_idx,
86 &self.row,
87 );
88 (&self.row, degree)
89 }
90
91 pub fn map<R2: Row>(self, f: impl FnOnce(R) -> R2) -> JoinRow<R2> {
93 JoinRow::new(f(self.row), self.degree)
94 }
95}
96
97pub type DegreeType = u64;
98
99pub(crate) fn build_degree_row<R: Row>(
100 order_key_indices: &[usize],
101 degree: DegreeType,
102 inequality_idx: Option<usize>,
103 row: &R,
104) -> impl Row {
105 let order_key = row.project(order_key_indices);
106 let inequality = inequality_idx.map(|idx| row.datum_at(idx).to_owned_datum());
107 let base = order_key.chain(row::once(Some(ScalarImpl::Int64(degree as i64))));
108 if let Some(ineq) = inequality {
109 Either::Right(base.chain(row::once(ineq)))
110 } else {
111 Either::Left(base)
112 }
113}
114
115pub trait CachedJoinRow: EstimateSize + Default + Send + Sync {
116 type DecodedRow: Row;
117
118 fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<Self::DecodedRow>>;
119
120 fn increase_degree(&mut self);
121
122 fn decrease_degree(&mut self);
123}
124
125#[derive(Clone, Debug, EstimateSize, Default)]
126pub struct EncodedJoinRow {
127 pub compacted_row: CompactedRow,
128 pub degree: DegreeType,
129}
130
131impl CachedJoinRow for EncodedJoinRow {
132 type DecodedRow = OwnedRow;
133
134 fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
135 let row = self.compacted_row.deserialize(data_types)?;
136 Ok(JoinRow::new(row, self.degree))
137 }
138
139 fn increase_degree(&mut self) {
140 self.degree += 1;
141 }
142
143 fn decrease_degree(&mut self) {
144 self.degree -= 1;
145 }
146}
147
148impl Default for JoinRow<OwnedRow> {
149 fn default() -> JoinRow<OwnedRow> {
150 Self {
151 row: OwnedRow::default(),
152 degree: DegreeType::default(),
153 }
154 }
155}
156
157impl EstimateSize for JoinRow<OwnedRow> {
158 fn estimated_heap_size(&self) -> usize {
159 self.row.estimated_heap_size()
160 }
161}
162
163impl CachedJoinRow for JoinRow<OwnedRow> {
164 type DecodedRow = OwnedRow;
165
166 fn decode(&self, _data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
167 Ok(self.clone())
168 }
169
170 fn increase_degree(&mut self) {
171 self.degree += 1;
172 }
173
174 fn decrease_degree(&mut self) {
175 self.degree -= 1;
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn test_cached_join_row_sizes() {
185 let encoded_size = size_of::<EncodedJoinRow>();
186 let unencoded_size = size_of::<JoinRow<OwnedRow>>();
187
188 assert_eq!(encoded_size, 40);
189 assert_eq!(unencoded_size, 24);
190 }
191}