risingwave_stream/executor/join/
row.rs1use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
16use risingwave_common::types::{DataType, ScalarImpl};
17use risingwave_common_estimate_size::EstimateSize;
18
19use crate::executor::StreamExecutorResult;
20
21pub trait JoinEncoding: 'static + Send + Sync + Default {
22 type EncodedRow: CachedJoinRow + Default;
23 fn encode<R: Row>(row: &JoinRow<R>) -> Self::EncodedRow;
24}
25
26#[derive(Default)]
27pub struct CpuEncoding {}
28
29impl JoinEncoding for CpuEncoding {
30 type EncodedRow = JoinRow<OwnedRow>;
31
32 fn encode<R: Row>(row: &JoinRow<R>) -> JoinRow<OwnedRow> {
33 JoinRow::new(row.row.to_owned_row(), row.degree)
34 }
35}
36
37#[derive(Default)]
38pub struct MemoryEncoding {}
39
40impl JoinEncoding for MemoryEncoding {
41 type EncodedRow = EncodedJoinRow;
42
43 fn encode<R: Row>(row: &JoinRow<R>) -> EncodedJoinRow {
44 EncodedJoinRow {
45 compacted_row: (&row.row).into(),
46 degree: row.degree,
47 }
48 }
49}
50
51#[derive(Clone, Debug)]
53pub struct JoinRow<R: Row> {
54 pub row: R,
55 pub degree: DegreeType,
56}
57
58impl<R: Row> JoinRow<R> {
59 pub fn new(row: R, degree: DegreeType) -> Self {
60 Self { row, degree }
61 }
62
63 pub fn is_zero_degree(&self) -> bool {
64 self.degree == 0
65 }
66
67 pub fn to_table_rows<'a>(
72 &'a self,
73 state_order_key_indices: &'a [usize],
74 ) -> (&'a R, impl Row + 'a) {
75 let order_key = (&self.row).project(state_order_key_indices);
76 let degree = build_degree_row(order_key, self.degree);
77 (&self.row, degree)
78 }
79}
80
81pub type DegreeType = u64;
82
83fn build_degree_row(order_key: impl Row, degree: DegreeType) -> impl Row {
84 order_key.chain(row::once(Some(ScalarImpl::Int64(degree as i64))))
85}
86
87pub trait CachedJoinRow: EstimateSize + Default + Send + Sync {
88 fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>>;
89
90 fn increase_degree(&mut self);
91
92 fn decrease_degree(&mut self);
93}
94
95#[derive(Clone, Debug, EstimateSize, Default)]
96pub struct EncodedJoinRow {
97 pub compacted_row: CompactedRow,
98 pub degree: DegreeType,
99}
100
101impl CachedJoinRow for EncodedJoinRow {
102 fn decode(&self, data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
103 let row = self.compacted_row.deserialize(data_types)?;
104 Ok(JoinRow::new(row, self.degree))
105 }
106
107 fn increase_degree(&mut self) {
108 self.degree += 1;
109 }
110
111 fn decrease_degree(&mut self) {
112 self.degree -= 1;
113 }
114}
115
116impl Default for JoinRow<OwnedRow> {
117 fn default() -> JoinRow<OwnedRow> {
118 Self {
119 row: OwnedRow::default(),
120 degree: DegreeType::default(),
121 }
122 }
123}
124
125impl EstimateSize for JoinRow<OwnedRow> {
126 fn estimated_heap_size(&self) -> usize {
127 self.row.estimated_heap_size()
128 }
129}
130
131impl CachedJoinRow for JoinRow<OwnedRow> {
132 fn decode(&self, _data_types: &[DataType]) -> StreamExecutorResult<JoinRow<OwnedRow>> {
133 Ok(self.clone())
134 }
135
136 fn increase_degree(&mut self) {
137 self.degree += 1;
138 }
139
140 fn decrease_degree(&mut self) {
141 self.degree -= 1;
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_cached_join_row_sizes() {
151 let encoded_size = size_of::<EncodedJoinRow>();
152 let unencoded_size = size_of::<JoinRow<OwnedRow>>();
153
154 assert_eq!(encoded_size, 40);
155 assert_eq!(unencoded_size, 24);
156 }
157}