risingwave_batch_executors/executor/join/
chunked_data.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Index, IndexMut};
16
17use crate::error::{BatchError, Result};
18
19/// Id of one row in chunked data.
20#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
21pub struct RowId {
22    chunk_id: u32,
23    row_id: u32,
24}
25
26/// [`ChunkedData`] is in fact a list of list.
27///
28/// We use this data structure instead of [`Vec<Vec<V>>`] to save allocation call.
29#[derive(Debug, Default, PartialEq)]
30pub(super) struct ChunkedData<V> {
31    data: Vec<V>,
32    chunk_offsets: Vec<usize>,
33}
34
35pub(super) struct AllRowIdIter<'a> {
36    cur: RowId,
37    chunk_offsets: &'a [usize],
38}
39
40impl Iterator for AllRowIdIter<'_> {
41    type Item = RowId;
42
43    fn next(&mut self) -> Option<Self::Item> {
44        if (self.cur.chunk_id() + 1) >= self.chunk_offsets.len() {
45            None
46        } else {
47            let ret = Some(self.cur);
48            let current_chunk_row_count = self.chunk_offsets[self.cur.chunk_id() + 1]
49                - self.chunk_offsets[self.cur.chunk_id()];
50            self.cur = self.cur.next_row(current_chunk_row_count);
51            ret
52        }
53    }
54}
55
56impl RowId {
57    pub(super) fn new(chunk_id: usize, row_id: usize) -> Self {
58        Self {
59            chunk_id: chunk_id as u32,
60            row_id: row_id as u32,
61        }
62    }
63
64    #[inline(always)]
65    pub(super) fn chunk_id(&self) -> usize {
66        self.chunk_id as usize
67    }
68
69    #[inline(always)]
70    pub(super) fn row_id(&self) -> usize {
71        self.row_id as usize
72    }
73
74    #[inline(always)]
75    pub(super) fn next_row(self, cur_chunk_row_count: usize) -> RowId {
76        if (self.row_id + 1) >= (cur_chunk_row_count as u32) {
77            RowId {
78                chunk_id: self.chunk_id + 1,
79                row_id: 0,
80            }
81        } else {
82            RowId {
83                chunk_id: self.chunk_id,
84                row_id: self.row_id + 1,
85            }
86        }
87    }
88}
89
90impl<V> ChunkedData<V> {
91    pub(super) fn with_chunk_sizes<C>(chunk_sizes: C) -> Result<Self>
92    where
93        C: IntoIterator<Item = usize>,
94        V: Default,
95    {
96        let chunk_sizes = chunk_sizes.into_iter();
97        let mut chunk_offsets = Vec::with_capacity(chunk_sizes.size_hint().0 + 1);
98        let mut cur = 0usize;
99        chunk_offsets.push(0);
100        for chunk_size in chunk_sizes {
101            ensure!(chunk_size > 0, "Chunk size can't be zero!");
102            cur += chunk_size;
103            chunk_offsets.push(cur);
104        }
105
106        let mut data = Vec::with_capacity(cur);
107        data.resize_with(cur, V::default);
108
109        Ok(Self {
110            data,
111            chunk_offsets,
112        })
113    }
114
115    fn index_in_data(&self, index: RowId) -> usize {
116        self.chunk_offsets[index.chunk_id()] + index.row_id()
117    }
118
119    pub(super) fn all_row_ids(&self) -> impl Iterator<Item = RowId> + '_ {
120        AllRowIdIter {
121            cur: RowId::default(),
122            chunk_offsets: &self.chunk_offsets,
123        }
124    }
125}
126
127impl<V> Index<RowId> for ChunkedData<V> {
128    type Output = V;
129
130    fn index(&self, index: RowId) -> &V {
131        &self.data[self.index_in_data(index)]
132    }
133}
134
135impl<V> IndexMut<RowId> for ChunkedData<V> {
136    fn index_mut(&mut self, index: RowId) -> &mut V {
137        let index_in_data = self.index_in_data(index);
138        &mut self.data[index_in_data]
139    }
140}
141
142impl<V> TryFrom<Vec<Vec<V>>> for ChunkedData<V> {
143    type Error = BatchError;
144
145    fn try_from(value: Vec<Vec<V>>) -> Result<Self> {
146        let chunk_offsets = std::iter::once(Ok(0))
147            .chain(value.iter().map(|chunk| -> Result<usize> {
148                ensure!(!chunk.is_empty(), "Chunk size can't be zero!");
149                Ok(chunk.len())
150            }))
151            .try_collect()?;
152        let data = value.into_iter().flatten().collect();
153        Ok(Self {
154            data,
155            chunk_offsets,
156        })
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    #[test]
165    fn test_all_row_ids() {
166        let chunk_sizes = vec![4, 3, 1, 2usize];
167
168        let chunked_data =
169            ChunkedData::<()>::with_chunk_sizes(chunk_sizes).expect("Build chunked data.");
170        let expected_all_row_ids = vec![
171            RowId::new(0, 0),
172            RowId::new(0, 1),
173            RowId::new(0, 2),
174            RowId::new(0, 3),
175            RowId::new(1, 0),
176            RowId::new(1, 1),
177            RowId::new(1, 2),
178            RowId::new(2, 0),
179            RowId::new(3, 0),
180            RowId::new(3, 1),
181        ];
182
183        assert_eq!(
184            expected_all_row_ids,
185            chunked_data.all_row_ids().collect::<Vec<RowId>>()
186        );
187    }
188
189    #[test]
190    fn test_indexes() {
191        let chunk_sizes = vec![4, 3, 1, 2usize];
192
193        let mut chunked_data =
194            ChunkedData::<usize>::with_chunk_sizes(chunk_sizes).expect("Build chunked data.");
195
196        let row_ids = vec![
197            RowId::new(0, 3),
198            RowId::new(1, 1),
199            RowId::new(2, 0),
200            RowId::new(3, 1),
201        ];
202
203        for row_id in &row_ids {
204            chunked_data[*row_id] = row_id.chunk_id() + row_id.row_id();
205        }
206
207        for row_id in &row_ids {
208            let expected = row_id.chunk_id() + row_id.row_id();
209            assert_eq!(expected, chunked_data[*row_id]);
210        }
211    }
212
213    #[test]
214    fn test_try_from() {
215        assert_eq!(
216            ChunkedData {
217                data: vec![1, 2, 3, 4, 5, 6, 7, 9, 8, 7, 6, 5, 123],
218                chunk_offsets: vec![0, 4, 3, 5, 1],
219            },
220            ChunkedData::try_from(vec![
221                vec![1, 2, 3, 4],
222                vec![5, 6, 7],
223                vec![9, 8, 7, 6, 5],
224                vec![123],
225            ])
226            .unwrap()
227        );
228    }
229
230    #[test]
231    #[should_panic]
232    fn test_zero_chunk_size_should_fail() {
233        let chunk_sizes = vec![4, 3, 0, 1, 2usize];
234        ChunkedData::<()>::with_chunk_sizes(chunk_sizes).unwrap();
235    }
236
237    #[test]
238    #[should_panic]
239    fn test_try_from_zero_chunk_size_should_fail() {
240        let chunks = vec![vec![0; 4], vec![0; 3], vec![], vec![0; 1], vec![0, 2]];
241        ChunkedData::try_from(chunks).unwrap();
242    }
243}