risingwave_stream/executor/join/
row.rs1use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
16use risingwave_common::types::{DataType, ScalarImpl};
17use risingwave_common_estimate_size::EstimateSize;
18
19use crate::executor::StreamExecutorResult;
20
21pub trait JoinEncoding: 'static + Send + Sync + Default {
22 type EncodedRow: CachedJoinRow<DecodedRow = Self::DecodedRow> + Default;
23 type DecodedRow: Row;
24
25 fn encode<R: Row>(row: &JoinRow<R>) -> Self::EncodedRow;
26}
27
28#[derive(Default)]
29pub struct CpuEncoding {}
30
31impl JoinEncoding for CpuEncoding {
32 type DecodedRow = OwnedRow;
33 type EncodedRow = JoinRow<OwnedRow>;
34
35 fn encode<R: Row>(row: &JoinRow<R>) -> JoinRow<OwnedRow> {
36 JoinRow::new(row.row.to_owned_row(), row.degree)
37 }
38}
39
40#[derive(Default)]
41pub struct MemoryEncoding {}
42
43impl JoinEncoding for MemoryEncoding {
44 type DecodedRow = OwnedRow;
45 type EncodedRow = EncodedJoinRow;
46
47 fn encode<R: Row>(row: &JoinRow<R>) -> EncodedJoinRow {
48 EncodedJoinRow {
49 compacted_row: (&row.row).into(),
50 degree: row.degree,
51 }
52 }
53}
54
55#[derive(Clone, Debug)]
57pub struct JoinRow<R: Row> {
58 pub row: R,
59 pub degree: DegreeType,
60}
61
62impl<R: Row> JoinRow<R> {
63 pub fn new(row: R, degree: DegreeType) -> Self {
64 Self { row, degree }
65 }
66
67 pub fn is_zero_degree(&self) -> bool {
68 self.degree == 0
69 }
70
71 pub fn to_table_rows<'a>(
76 &'a self,
77 state_order_key_indices: &'a [usize],
78 ) -> (&'a R, impl Row + 'a) {
79 let order_key = (&self.row).project(state_order_key_indices);
80 let degree = build_degree_row(order_key, self.degree);
81 (&self.row, degree)
82 }
83
84 pub fn map<R2: Row>(self, f: impl FnOnce(R) -> R2) -> JoinRow<R2> {
86 JoinRow::new(f(self.row), self.degree)
87 }
88}
89
90pub type DegreeType = u64;
91
92fn build_degree_row(order_key: impl Row, degree: DegreeType) -> impl Row {
93 order_key.chain(row::once(Some(ScalarImpl::Int64(degree as i64))))
94}
95
96pub trait CachedJoinRow: EstimateSize + Default + Send + Sync {
97 type DecodedRow: Row;
98
99 fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<Self::DecodedRow>>;
100
101 fn increase_degree(&mut self);
102
103 fn decrease_degree(&mut self);
104}
105
106#[derive(Clone, Debug, EstimateSize, Default)]
107pub struct EncodedJoinRow {
108 pub compacted_row: CompactedRow,
109 pub degree: DegreeType,
110}
111
112impl CachedJoinRow for EncodedJoinRow {
113 type DecodedRow = OwnedRow;
114
115 fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
116 let row = self.compacted_row.deserialize(data_types)?;
117 Ok(JoinRow::new(row, self.degree))
118 }
119
120 fn increase_degree(&mut self) {
121 self.degree += 1;
122 }
123
124 fn decrease_degree(&mut self) {
125 self.degree -= 1;
126 }
127}
128
129impl Default for JoinRow<OwnedRow> {
130 fn default() -> JoinRow<OwnedRow> {
131 Self {
132 row: OwnedRow::default(),
133 degree: DegreeType::default(),
134 }
135 }
136}
137
138impl EstimateSize for JoinRow<OwnedRow> {
139 fn estimated_heap_size(&self) -> usize {
140 self.row.estimated_heap_size()
141 }
142}
143
144impl CachedJoinRow for JoinRow<OwnedRow> {
145 type DecodedRow = OwnedRow;
146
147 fn decode(&self, _data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
148 Ok(self.clone())
149 }
150
151 fn increase_degree(&mut self) {
152 self.degree += 1;
153 }
154
155 fn decrease_degree(&mut self) {
156 self.degree -= 1;
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn test_cached_join_row_sizes() {
166 let encoded_size = size_of::<EncodedJoinRow>();
167 let unencoded_size = size_of::<JoinRow<OwnedRow>>();
168
169 assert_eq!(encoded_size, 40);
170 assert_eq!(unencoded_size, 24);
171 }
172}