risingwave_batch_executors/executor/
sys_row_seq_scan.rs1use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{ColumnDesc, Schema, SysCatalogReaderRef, TableId};
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20
21use crate::error::{BatchError, Result};
22use crate::executor::{
23 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
24};
25
26pub struct SysRowSeqScanExecutor {
27 table_id: TableId,
28 schema: Schema,
29 column_indices: Vec<usize>,
30 identity: String,
31
32 sys_catalog_reader: SysCatalogReaderRef,
33}
34
35impl SysRowSeqScanExecutor {
36 pub fn new(
37 table_id: TableId,
38 schema: Schema,
39 column_indices: Vec<usize>,
40 identity: String,
41 sys_catalog_reader: SysCatalogReaderRef,
42 ) -> Self {
43 Self {
44 table_id,
45 schema,
46 column_indices,
47 identity,
48 sys_catalog_reader,
49 }
50 }
51}
52
53pub struct SysRowSeqScanExecutorBuilder {}
54
55impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
56 async fn new_boxed_executor(
57 source: &ExecutorBuilder<'_>,
58 inputs: Vec<BoxedExecutor>,
59 ) -> Result<BoxedExecutor> {
60 ensure!(
61 inputs.is_empty(),
62 "Row sequential scan should not have input executor!"
63 );
64 let seq_scan_node = try_match_expand!(
65 source.plan_node().get_node_body().unwrap(),
66 NodeBody::SysRowSeqScan
67 )?;
68 let sys_catalog_reader = source.context().catalog_reader();
69
70 let table_id = seq_scan_node.get_table_id().into();
71 let column_descs = seq_scan_node
72 .column_descs
73 .iter()
74 .map(|column_desc| ColumnDesc::from(column_desc.clone()))
75 .collect_vec();
76
77 let column_indices = column_descs
78 .iter()
79 .map(|d| d.column_id.get_id() as usize)
80 .collect_vec();
81 let schema = Schema::new(column_descs.iter().map(Into::into).collect_vec());
82 Ok(Box::new(SysRowSeqScanExecutor::new(
83 table_id,
84 schema,
85 column_indices,
86 source.plan_node().get_identity().clone(),
87 sys_catalog_reader,
88 )))
89 }
90}
91
92impl Executor for SysRowSeqScanExecutor {
93 fn schema(&self) -> &Schema {
94 &self.schema
95 }
96
97 fn identity(&self) -> &str {
98 &self.identity
99 }
100
101 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
102 self.do_execute()
103 }
104}
105
106impl SysRowSeqScanExecutor {
107 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
108 async fn do_execute(self: Box<Self>) {
109 #[for_await]
110 for chunk in self.sys_catalog_reader.read_table(self.table_id) {
111 let chunk = chunk.map_err(BatchError::SystemTable)?;
112 yield chunk.project(&self.column_indices);
113 }
114 }
115}