risingwave_batch_executors/executor/
utils.rs1use core::ops::{Bound, RangeBounds};
16
17use futures::StreamExt;
18use futures::stream::BoxStream;
19use futures_async_stream::try_stream;
20use itertools::Itertools;
21use risingwave_common::array::DataChunk;
22use risingwave_common::catalog::Schema;
23use risingwave_common::row::{OwnedRow, Row};
24use risingwave_common::types::DataType;
25use risingwave_common::util::value_encoding::deserialize_datum;
26use risingwave_pb::batch_plan::{PbScanRange, scan_range};
27use risingwave_pb::plan_common::StorageTableDesc;
28use risingwave_storage::StateStore;
29use risingwave_storage::table::batch_table::BatchTable;
30
31use crate::error::{BatchError, Result};
32use crate::executor::{BoxedDataChunkStream, Executor};
33
34pub type BoxedDataChunkListStream = BoxStream<'static, Result<Vec<DataChunk>>>;
35
36#[try_stream(boxed, ok = Vec<DataChunk>, error = BatchError)]
38pub async fn batch_read(mut stream: BoxedDataChunkStream, rows: usize) {
39 let mut cnt = 0;
40 let mut chunk_list = vec![];
41 while let Some(build_chunk) = stream.next().await {
42 let build_chunk = build_chunk?;
43 cnt += build_chunk.cardinality();
44 chunk_list.push(build_chunk);
45 if cnt < rows {
46 continue;
47 } else {
48 yield chunk_list;
49 cnt = 0;
50 chunk_list = vec![];
51 }
52 }
53 if !chunk_list.is_empty() {
54 yield chunk_list;
55 }
56}
57
58pub struct BufferChunkExecutor {
59 schema: Schema,
60 chunk_list: Vec<DataChunk>,
61}
62
63impl Executor for BufferChunkExecutor {
64 fn schema(&self) -> &Schema {
65 &self.schema
66 }
67
68 fn identity(&self) -> &str {
69 "BufferChunkExecutor"
70 }
71
72 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
73 self.do_execute()
74 }
75}
76
77impl BufferChunkExecutor {
78 pub fn new(schema: Schema, chunk_list: Vec<DataChunk>) -> Self {
79 Self { schema, chunk_list }
80 }
81
82 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
83 async fn do_execute(self) {
84 for chunk in self.chunk_list {
85 yield chunk
86 }
87 }
88}
89
90pub struct DummyExecutor {
91 pub schema: Schema,
92}
93
94impl Executor for DummyExecutor {
95 fn schema(&self) -> &Schema {
96 &self.schema
97 }
98
99 fn identity(&self) -> &str {
100 "dummy"
101 }
102
103 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
104 DummyExecutor::do_nothing()
105 }
106}
107
108impl DummyExecutor {
109 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
110 async fn do_nothing() {}
111}
112
113pub struct WrapStreamExecutor {
114 schema: Schema,
115 stream: BoxedDataChunkStream,
116}
117
118impl WrapStreamExecutor {
119 pub fn new(schema: Schema, stream: BoxedDataChunkStream) -> Self {
120 Self { schema, stream }
121 }
122}
123
124impl Executor for WrapStreamExecutor {
125 fn schema(&self) -> &Schema {
126 &self.schema
127 }
128
129 fn identity(&self) -> &str {
130 "WrapStreamExecutor"
131 }
132
133 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
134 self.stream
135 }
136}
137
138pub struct ScanRange {
140 pub pk_prefix: OwnedRow,
142
143 pub next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>),
145}
146impl ScanRange {
147 pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> Result<Self> {
149 let mut index = 0;
150 let pk_prefix = OwnedRow::new(
151 scan_range
152 .eq_conds
153 .iter()
154 .map(|v| {
155 let ty = pk_types.get(index).unwrap();
156 index += 1;
157 deserialize_datum(v.as_slice(), ty)
158 })
159 .try_collect()?,
160 );
161 if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
162 return Ok(Self {
163 pk_prefix,
164 ..Self::full()
165 });
166 }
167
168 let build_bound = |bound: &scan_range::Bound, mut index| -> Result<Bound<OwnedRow>> {
169 let next_col_bounds = OwnedRow::new(
170 bound
171 .value
172 .iter()
173 .map(|v| {
174 let ty = pk_types.get(index).unwrap();
175 index += 1;
176 deserialize_datum(v.as_slice(), ty)
177 })
178 .try_collect()?,
179 );
180 if bound.inclusive {
181 Ok(Bound::Included(next_col_bounds))
182 } else {
183 Ok(Bound::Excluded(next_col_bounds))
184 }
185 };
186
187 let next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>) = match (
188 scan_range.lower_bound.as_ref(),
189 scan_range.upper_bound.as_ref(),
190 ) {
191 (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
192 (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
193 (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
194 (None, None) => unreachable!(),
195 };
196 Ok(Self {
197 pk_prefix,
198 next_col_bounds,
199 })
200 }
201
202 pub fn full() -> Self {
204 Self {
205 pk_prefix: OwnedRow::default(),
206 next_col_bounds: (Bound::Unbounded, Bound::Unbounded),
207 }
208 }
209
210 pub fn convert_to_range_bounds<S: StateStore>(
211 self,
212 table: &BatchTable<S>,
213 ) -> impl RangeBounds<OwnedRow> {
214 let ScanRange {
215 pk_prefix,
216 next_col_bounds,
217 } = self;
218
219 let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
221 let (start_bound, end_bound) = if order_type.is_ascending() {
222 (next_col_bounds.0, next_col_bounds.1)
223 } else {
224 (next_col_bounds.1, next_col_bounds.0)
225 };
226
227 let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
228 let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);
229
230 let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| {
231 match bound {
232 Bound::Unbounded => {
233 if other_bound_is_bounded && order_type_nulls {
234 Bound::Excluded(OwnedRow::new(vec![None]))
236 } else {
237 Bound::Unbounded
239 }
240 }
241 Bound::Included(x) => Bound::Included(x),
242 Bound::Excluded(x) => Bound::Excluded(x),
243 }
244 };
245 let start_bound = build_bound(
246 end_bound_is_bounded,
247 start_bound,
248 order_type.nulls_are_first(),
249 );
250 let end_bound = build_bound(
251 start_bound_is_bounded,
252 end_bound,
253 order_type.nulls_are_last(),
254 );
255 (start_bound, end_bound)
256 }
257}
258
259pub fn build_scan_ranges_from_pb(
260 scan_ranges: &Vec<PbScanRange>,
261 table_desc: &StorageTableDesc,
262) -> Result<Vec<ScanRange>> {
263 if scan_ranges.is_empty() {
264 Ok(vec![ScanRange::full()])
265 } else {
266 Ok(scan_ranges
267 .iter()
268 .map(|scan_range| build_scan_range_from_pb(scan_range, table_desc))
269 .try_collect()?)
270 }
271}
272
273pub fn build_scan_range_from_pb(
274 scan_range: &PbScanRange,
275 table_desc: &StorageTableDesc,
276) -> Result<ScanRange> {
277 let pk_types = table_desc
278 .pk
279 .iter()
280 .map(|order| {
281 DataType::from(
282 table_desc.columns[order.column_index as usize]
283 .column_type
284 .as_ref()
285 .unwrap(),
286 )
287 })
288 .collect_vec();
289 ScanRange::new(scan_range.clone(), pk_types)
290}