risingwave_batch_executors/executor/join/
chunked_data.rs1use std::ops::{Index, IndexMut};
16
17use crate::error::{BatchError, Result};
18
19#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
21pub struct RowId {
22 chunk_id: u32,
23 row_id: u32,
24}
25
26#[derive(Debug, Default, PartialEq)]
30pub(super) struct ChunkedData<V> {
31 data: Vec<V>,
32 chunk_offsets: Vec<usize>,
33}
34
35pub(super) struct AllRowIdIter<'a> {
36 cur: RowId,
37 chunk_offsets: &'a [usize],
38}
39
40impl Iterator for AllRowIdIter<'_> {
41 type Item = RowId;
42
43 fn next(&mut self) -> Option<Self::Item> {
44 if (self.cur.chunk_id() + 1) >= self.chunk_offsets.len() {
45 None
46 } else {
47 let ret = Some(self.cur);
48 let current_chunk_row_count = self.chunk_offsets[self.cur.chunk_id() + 1]
49 - self.chunk_offsets[self.cur.chunk_id()];
50 self.cur = self.cur.next_row(current_chunk_row_count);
51 ret
52 }
53 }
54}
55
56impl RowId {
57 pub(super) fn new(chunk_id: usize, row_id: usize) -> Self {
58 Self {
59 chunk_id: chunk_id as u32,
60 row_id: row_id as u32,
61 }
62 }
63
64 #[inline(always)]
65 pub(super) fn chunk_id(&self) -> usize {
66 self.chunk_id as usize
67 }
68
69 #[inline(always)]
70 pub(super) fn row_id(&self) -> usize {
71 self.row_id as usize
72 }
73
74 #[inline(always)]
75 pub(super) fn next_row(self, cur_chunk_row_count: usize) -> RowId {
76 if (self.row_id + 1) >= (cur_chunk_row_count as u32) {
77 RowId {
78 chunk_id: self.chunk_id + 1,
79 row_id: 0,
80 }
81 } else {
82 RowId {
83 chunk_id: self.chunk_id,
84 row_id: self.row_id + 1,
85 }
86 }
87 }
88}
89
90impl<V> ChunkedData<V> {
91 pub(super) fn with_chunk_sizes<C>(chunk_sizes: C) -> Result<Self>
92 where
93 C: IntoIterator<Item = usize>,
94 V: Default,
95 {
96 let chunk_sizes = chunk_sizes.into_iter();
97 let mut chunk_offsets = Vec::with_capacity(chunk_sizes.size_hint().0 + 1);
98 let mut cur = 0usize;
99 chunk_offsets.push(0);
100 for chunk_size in chunk_sizes {
101 ensure!(chunk_size > 0, "Chunk size can't be zero!");
102 cur += chunk_size;
103 chunk_offsets.push(cur);
104 }
105
106 let mut data = Vec::with_capacity(cur);
107 data.resize_with(cur, V::default);
108
109 Ok(Self {
110 data,
111 chunk_offsets,
112 })
113 }
114
115 fn index_in_data(&self, index: RowId) -> usize {
116 self.chunk_offsets[index.chunk_id()] + index.row_id()
117 }
118
119 pub(super) fn all_row_ids(&self) -> impl Iterator<Item = RowId> + '_ {
120 AllRowIdIter {
121 cur: RowId::default(),
122 chunk_offsets: &self.chunk_offsets,
123 }
124 }
125}
126
127impl<V> Index<RowId> for ChunkedData<V> {
128 type Output = V;
129
130 fn index(&self, index: RowId) -> &V {
131 &self.data[self.index_in_data(index)]
132 }
133}
134
135impl<V> IndexMut<RowId> for ChunkedData<V> {
136 fn index_mut(&mut self, index: RowId) -> &mut V {
137 let index_in_data = self.index_in_data(index);
138 &mut self.data[index_in_data]
139 }
140}
141
142impl<V> TryFrom<Vec<Vec<V>>> for ChunkedData<V> {
143 type Error = BatchError;
144
145 fn try_from(value: Vec<Vec<V>>) -> Result<Self> {
146 let chunk_offsets = std::iter::once(Ok(0))
147 .chain(value.iter().map(|chunk| -> Result<usize> {
148 ensure!(!chunk.is_empty(), "Chunk size can't be zero!");
149 Ok(chunk.len())
150 }))
151 .try_collect()?;
152 let data = value.into_iter().flatten().collect();
153 Ok(Self {
154 data,
155 chunk_offsets,
156 })
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn test_all_row_ids() {
166 let chunk_sizes = vec![4, 3, 1, 2usize];
167
168 let chunked_data =
169 ChunkedData::<()>::with_chunk_sizes(chunk_sizes).expect("Build chunked data.");
170 let expected_all_row_ids = vec![
171 RowId::new(0, 0),
172 RowId::new(0, 1),
173 RowId::new(0, 2),
174 RowId::new(0, 3),
175 RowId::new(1, 0),
176 RowId::new(1, 1),
177 RowId::new(1, 2),
178 RowId::new(2, 0),
179 RowId::new(3, 0),
180 RowId::new(3, 1),
181 ];
182
183 assert_eq!(
184 expected_all_row_ids,
185 chunked_data.all_row_ids().collect::<Vec<RowId>>()
186 );
187 }
188
189 #[test]
190 fn test_indexes() {
191 let chunk_sizes = vec![4, 3, 1, 2usize];
192
193 let mut chunked_data =
194 ChunkedData::<usize>::with_chunk_sizes(chunk_sizes).expect("Build chunked data.");
195
196 let row_ids = vec![
197 RowId::new(0, 3),
198 RowId::new(1, 1),
199 RowId::new(2, 0),
200 RowId::new(3, 1),
201 ];
202
203 for row_id in &row_ids {
204 chunked_data[*row_id] = row_id.chunk_id() + row_id.row_id();
205 }
206
207 for row_id in &row_ids {
208 let expected = row_id.chunk_id() + row_id.row_id();
209 assert_eq!(expected, chunked_data[*row_id]);
210 }
211 }
212
213 #[test]
214 fn test_try_from() {
215 assert_eq!(
216 ChunkedData {
217 data: vec![1, 2, 3, 4, 5, 6, 7, 9, 8, 7, 6, 5, 123],
218 chunk_offsets: vec![0, 4, 3, 5, 1],
219 },
220 ChunkedData::try_from(vec![
221 vec![1, 2, 3, 4],
222 vec![5, 6, 7],
223 vec![9, 8, 7, 6, 5],
224 vec![123],
225 ])
226 .unwrap()
227 );
228 }
229
230 #[test]
231 #[should_panic]
232 fn test_zero_chunk_size_should_fail() {
233 let chunk_sizes = vec![4, 3, 0, 1, 2usize];
234 ChunkedData::<()>::with_chunk_sizes(chunk_sizes).unwrap();
235 }
236
237 #[test]
238 #[should_panic]
239 fn test_try_from_zero_chunk_size_should_fail() {
240 let chunks = vec![vec![0; 4], vec![0; 3], vec![], vec![0; 1], vec![0, 2]];
241 ChunkedData::try_from(chunks).unwrap();
242 }
243}