risingwave_batch_executors/executor/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::ops::{Bound, RangeBounds};
16
17use futures::StreamExt;
18use futures::stream::BoxStream;
19use futures_async_stream::try_stream;
20use itertools::Itertools;
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::row::{OwnedRow, Row};
24use risingwave_common::types::DataType;
25use risingwave_common::util::value_encoding::deserialize_datum;
26use risingwave_pb::batch_plan::{PbScanRange, scan_range};
27use risingwave_pb::plan_common::StorageTableDesc;
28use risingwave_storage::StateStore;
29use risingwave_storage::table::batch_table::BatchTable;
30
31use crate::error::{BatchError, Result};
32use crate::executor::{BoxedDataChunkStream, Executor};
33
34pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
35
36/// Read at least `rows` rows.
37#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
38pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
39    let mut cnt = 0;
40    let mut chunk_list = vec![];
41    while let Some(build_chunk) = stream.next().await {
42        let build_chunk = build_chunk?;
43        cnt += build_chunk.cardinality();
44        chunk_list.push(build_chunk);
45        if cnt < rows {
46            continue;
47        } else {
48            yield chunk_list;
49            cnt = 0;
50            chunk_list = vec![];
51        }
52    }
53    if !chunk_list.is_empty() {
54        yield chunk_list;
55    }
56}
57
58pub struct BufferChunkExecutor {
59    schema: Schema,
60    chunk_list: Vec<DataChunk>,
61}
62
63impl Executor for BufferChunkExecutor {
64    fn schema(&self) -> &Schema {
65        &self.schema
66    }
67
68    fn identity(&self) -> &str {
69        "BufferChunkExecutor"
70    }
71
72    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
73        self.do_execute()
74    }
75}
76
77impl BufferChunkExecutor {
78    pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
79        Self { schema, chunk_list }
80    }
81
82    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
83    async fn do_execute(self) {
84        for chunk in self.chunk_list {
85            yield chunk
86        }
87    }
88}
89
90pub struct DummyExecutor {
91    pub schema: Schema,
92}
93
94impl Executor for DummyExecutor {
95    fn schema(&self) -> &Schema {
96        &self.schema
97    }
98
99    fn identity(&self) -> &str {
100        "dummy"
101    }
102
103    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
104        DummyExecutor::do_nothing()
105    }
106}
107
108impl DummyExecutor {
109    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
110    async fn do_nothing() {}
111}
112
113pub struct WrapStreamExecutor {
114    schema: Schema,
115    stream: BoxedDataChunkStream,
116}
117
118impl WrapStreamExecutor {
119    pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
120        Self { schema, stream }
121    }
122}
123
124impl Executor for WrapStreamExecutor {
125    fn schema(&self) -> &Schema {
126        &self.schema
127    }
128
129    fn identity(&self) -> &str {
130        "WrapStreamExecutor"
131    }
132
133    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
134        self.stream
135    }
136}
137
138/// Range for batch scan.
139pub struct ScanRange {
140    /// The prefix of the primary key.
141    pub pk_prefix: OwnedRow,
142
143    /// The range bounds of the next column.
144    pub next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>),
145}
146impl ScanRange {
147    /// Create a scan range from the prost representation.
148    pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> Result<Self> {
149        let mut index = 0;
150        let pk_prefix = OwnedRow::new(
151            scan_range
152                .eq_conds
153                .iter()
154                .map(|v| {
155                    let ty = pk_types.get(index).unwrap();
156                    index += 1;
157                    deserialize_datum(v.as_slice(), ty)
158                })
159                .try_collect()?,
160        );
161        if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
162            return Ok(Self {
163                pk_prefix,
164                ..Self::full()
165            });
166        }
167
168        let build_bound = |bound: &scan_range::Bound, mut index| -> Result<Bound<OwnedRow>> {
169            let next_col_bounds = OwnedRow::new(
170                bound
171                    .value
172                    .iter()
173                    .map(|v| {
174                        let ty = pk_types.get(index).unwrap();
175                        index += 1;
176                        deserialize_datum(v.as_slice(), ty)
177                    })
178                    .try_collect()?,
179            );
180            if bound.inclusive {
181                Ok(Bound::Included(next_col_bounds))
182            } else {
183                Ok(Bound::Excluded(next_col_bounds))
184            }
185        };
186
187        let next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>) = match (
188            scan_range.lower_bound.as_ref(),
189            scan_range.upper_bound.as_ref(),
190        ) {
191            (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
192            (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
193            (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
194            (None, None) => unreachable!(),
195        };
196        Ok(Self {
197            pk_prefix,
198            next_col_bounds,
199        })
200    }
201
202    /// Create a scan range for full table scan.
203    pub fn full() -> Self {
204        Self {
205            pk_prefix: OwnedRow::default(),
206            next_col_bounds: (Bound::Unbounded, Bound::Unbounded),
207        }
208    }
209
210    pub fn convert_to_range_bounds<S: StateStore>(
211        self,
212        table: &BatchTable<S>,
213    ) -> impl RangeBounds<OwnedRow> {
214        let ScanRange {
215            pk_prefix,
216            next_col_bounds,
217        } = self;
218
219        // The len of a valid pk_prefix should be less than or equal pk's num.
220        let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
221        let (start_bound, end_bound) = if order_type.is_ascending() {
222            (next_col_bounds.0, next_col_bounds.1)
223        } else {
224            (next_col_bounds.1, next_col_bounds.0)
225        };
226
227        let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
228        let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);
229
230        let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| {
231            match bound {
232                Bound::Unbounded => {
233                    if other_bound_is_bounded && order_type_nulls {
234                        // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
235                        Bound::Excluded(OwnedRow::new(vec![None]))
236                    } else {
237                        // Both start and end are unbounded, so we need to select all rows.
238                        Bound::Unbounded
239                    }
240                }
241                Bound::Included(x) => Bound::Included(x),
242                Bound::Excluded(x) => Bound::Excluded(x),
243            }
244        };
245        let start_bound = build_bound(
246            end_bound_is_bounded,
247            start_bound,
248            order_type.nulls_are_first(),
249        );
250        let end_bound = build_bound(
251            start_bound_is_bounded,
252            end_bound,
253            order_type.nulls_are_last(),
254        );
255        (start_bound, end_bound)
256    }
257}
258
259pub fn build_scan_ranges_from_pb(
260    scan_ranges: &Vec<PbScanRange>,
261    table_desc: &StorageTableDesc,
262) -> Result<Vec<ScanRange>> {
263    if scan_ranges.is_empty() {
264        Ok(vec![ScanRange::full()])
265    } else {
266        Ok(scan_ranges
267            .iter()
268            .map(|scan_range| build_scan_range_from_pb(scan_range, table_desc))
269            .try_collect()?)
270    }
271}
272
273pub fn build_scan_range_from_pb(
274    scan_range: &PbScanRange,
275    table_desc: &StorageTableDesc,
276) -> Result<ScanRange> {
277    let pk_types = table_desc
278        .pk
279        .iter()
280        .map(|order| {
281            DataType::from(
282                table_desc.columns[order.column_index as usize]
283                    .column_type
284                    .as_ref()
285                    .unwrap(),
286            )
287        })
288        .collect_vec();
289    ScanRange::new(scan_range.clone(), pk_types)
290}