risingwave_batch_executors/executor/
sys_row_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{ColumnDesc, Schema, SysCatalogReaderRef, TableId};
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20
21use crate::error::{BatchError, Result};
22use crate::executor::{
23    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
24};
25
26pub struct SysRowSeqScanExecutor {
27    table_id: TableId,
28    schema: Schema,
29    column_indices: Vec<usize>,
30    identity: String,
31
32    sys_catalog_reader: SysCatalogReaderRef,
33}
34
35impl SysRowSeqScanExecutor {
36    pub fn new(
37        table_id: TableId,
38        schema: Schema,
39        column_indices: Vec<usize>,
40        identity: String,
41        sys_catalog_reader: SysCatalogReaderRef,
42    ) -> Self {
43        Self {
44            table_id,
45            schema,
46            column_indices,
47            identity,
48            sys_catalog_reader,
49        }
50    }
51}
52
53pub struct SysRowSeqScanExecutorBuilder {}
54
55impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
56    async fn new_boxed_executor(
57        source: &ExecutorBuilder<'_>,
58        inputs: Vec<BoxedExecutor>,
59    ) -> Result<BoxedExecutor> {
60        ensure!(
61            inputs.is_empty(),
62            "Row sequential scan should not have input executor!"
63        );
64        let seq_scan_node = try_match_expand!(
65            source.plan_node().get_node_body().unwrap(),
66            NodeBody::SysRowSeqScan
67        )?;
68        let sys_catalog_reader = source.context().catalog_reader();
69
70        let table_id = seq_scan_node.get_table_id().into();
71        let column_descs = seq_scan_node
72            .column_descs
73            .iter()
74            .map(|column_desc| ColumnDesc::from(column_desc.clone()))
75            .collect_vec();
76
77        let column_indices = column_descs
78            .iter()
79            .map(|d| d.column_id.get_id() as usize)
80            .collect_vec();
81        let schema = Schema::new(column_descs.iter().map(Into::into).collect_vec());
82        Ok(Box::new(SysRowSeqScanExecutor::new(
83            table_id,
84            schema,
85            column_indices,
86            source.plan_node().get_identity().clone(),
87            sys_catalog_reader,
88        )))
89    }
90}
91
92impl Executor for SysRowSeqScanExecutor {
93    fn schema(&self) -> &Schema {
94        &self.schema
95    }
96
97    fn identity(&self) -> &str {
98        &self.identity
99    }
100
101    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
102        self.do_execute()
103    }
104}
105
106impl SysRowSeqScanExecutor {
107    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
108    async fn do_execute(self: Box<Self>) {
109        #[for_await]
110        for chunk in self.sys_catalog_reader.read_table(self.table_id) {
111            let chunk = chunk.map_err(BatchError::SystemTable)?;
112            yield chunk.project(&self.column_indices);
113        }
114    }
115}