risingwave_batch_executors/executor/
sys_row_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Schema, SysCatalogReaderRef, TableId};
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20
21use crate::error::{BatchError, Result};
22use crate::executor::{
23    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
24};
25
26pub struct SysRowSeqScanExecutor {
27    table_id: TableId,
28    schema: Schema,
29    column_indices: Vec<usize>,
30    identity: String,
31
32    sys_catalog_reader: SysCatalogReaderRef,
33}
34
35impl SysRowSeqScanExecutor {
36    pub fn new(
37        table_id: TableId,
38        schema: Schema,
39        column_indices: Vec<usize>,
40        identity: String,
41        sys_catalog_reader: SysCatalogReaderRef,
42    ) -> Self {
43        Self {
44            table_id,
45            schema,
46            column_indices,
47            identity,
48            sys_catalog_reader,
49        }
50    }
51}
52
53pub struct SysRowSeqScanExecutorBuilder {}
54
55impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
56    async fn new_boxed_executor(
57        source: &ExecutorBuilder<'_>,
58        inputs: Vec<BoxedExecutor>,
59    ) -> Result<BoxedExecutor> {
60        ensure!(
61            inputs.is_empty(),
62            "Row sequential scan should not have input executor!"
63        );
64        let seq_scan_node = try_match_expand!(
65            source.plan_node().get_node_body().unwrap(),
66            NodeBody::SysRowSeqScan
67        )?;
68        let sys_catalog_reader = source.context().catalog_reader();
69
70        let table_id = seq_scan_node.get_table_id().into();
71
72        let column_indices = seq_scan_node
73            .column_descs
74            .iter()
75            .map(|d| d.column_id as usize)
76            .collect_vec();
77        let schema = Schema::new(
78            seq_scan_node
79                .column_descs
80                .iter()
81                .map(Into::into)
82                .collect_vec(),
83        );
84        Ok(Box::new(SysRowSeqScanExecutor::new(
85            table_id,
86            schema,
87            column_indices,
88            source.plan_node().get_identity().clone(),
89            sys_catalog_reader,
90        )))
91    }
92}
93
94impl Executor for SysRowSeqScanExecutor {
95    fn schema(&self) -> &Schema {
96        &self.schema
97    }
98
99    fn identity(&self) -> &str {
100        &self.identity
101    }
102
103    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
104        self.do_execute()
105    }
106}
107
108impl SysRowSeqScanExecutor {
109    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
110    async fn do_execute(self: Box<Self>) {
111        #[for_await]
112        for chunk in self.sys_catalog_reader.read_table(self.table_id) {
113            let chunk = chunk.map_err(BatchError::SystemTable)?;
114            yield chunk.project(&self.column_indices);
115        }
116    }
117}