risingwave_batch_executors/executor/
sys_row_seq_scan.rs1use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Schema, SysCatalogReaderRef, TableId};
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20
21use crate::error::{BatchError, Result};
22use crate::executor::{
23 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
24};
25
26pub struct SysRowSeqScanExecutor {
27 table_id: TableId,
28 schema: Schema,
29 column_indices: Vec<usize>,
30 identity: String,
31
32 sys_catalog_reader: SysCatalogReaderRef,
33}
34
35impl SysRowSeqScanExecutor {
36 pub fn new(
37 table_id: TableId,
38 schema: Schema,
39 column_indices: Vec<usize>,
40 identity: String,
41 sys_catalog_reader: SysCatalogReaderRef,
42 ) -> Self {
43 Self {
44 table_id,
45 schema,
46 column_indices,
47 identity,
48 sys_catalog_reader,
49 }
50 }
51}
52
53pub struct SysRowSeqScanExecutorBuilder {}
54
55impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
56 async fn new_boxed_executor(
57 source: &ExecutorBuilder<'_>,
58 inputs: Vec<BoxedExecutor>,
59 ) -> Result<BoxedExecutor> {
60 ensure!(
61 inputs.is_empty(),
62 "Row sequential scan should not have input executor!"
63 );
64 let seq_scan_node = try_match_expand!(
65 source.plan_node().get_node_body().unwrap(),
66 NodeBody::SysRowSeqScan
67 )?;
68 let sys_catalog_reader = source.context().catalog_reader();
69
70 let table_id = seq_scan_node.get_table_id().into();
71
72 let column_indices = seq_scan_node
73 .column_descs
74 .iter()
75 .map(|d| d.column_id as usize)
76 .collect_vec();
77 let schema = Schema::new(
78 seq_scan_node
79 .column_descs
80 .iter()
81 .map(Into::into)
82 .collect_vec(),
83 );
84 Ok(Box::new(SysRowSeqScanExecutor::new(
85 table_id,
86 schema,
87 column_indices,
88 source.plan_node().get_identity().clone(),
89 sys_catalog_reader,
90 )))
91 }
92}
93
94impl Executor for SysRowSeqScanExecutor {
95 fn schema(&self) -> &Schema {
96 &self.schema
97 }
98
99 fn identity(&self) -> &str {
100 &self.identity
101 }
102
103 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
104 self.do_execute()
105 }
106}
107
108impl SysRowSeqScanExecutor {
109 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
110 async fn do_execute(self: Box<Self>) {
111 #[for_await]
112 for chunk in self.sys_catalog_reader.read_table(self.table_id) {
113 let chunk = chunk.map_err(BatchError::SystemTable)?;
114 yield chunk.project(&self.column_indices);
115 }
116 }
117}