risingwave_batch/executor/
sys_row_seq_scan.rsuse futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, Schema, SysCatalogReaderRef, TableId};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;
pub struct SysRowSeqScanExecutor {
table_id: TableId,
schema: Schema,
column_indices: Vec<usize>,
identity: String,
sys_catalog_reader: SysCatalogReaderRef,
}
impl SysRowSeqScanExecutor {
pub fn new(
table_id: TableId,
schema: Schema,
column_indices: Vec<usize>,
identity: String,
sys_catalog_reader: SysCatalogReaderRef,
) -> Self {
Self {
table_id,
schema,
column_indices,
identity,
sys_catalog_reader,
}
}
}
pub struct SysRowSeqScanExecutorBuilder {}
#[async_trait::async_trait]
impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
ensure!(
inputs.is_empty(),
"Row sequential scan should not have input executor!"
);
let seq_scan_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::SysRowSeqScan
)?;
let sys_catalog_reader = source.context.catalog_reader();
let table_id = seq_scan_node.get_table_id().into();
let column_descs = seq_scan_node
.column_descs
.iter()
.map(|column_desc| ColumnDesc::from(column_desc.clone()))
.collect_vec();
let column_indices = column_descs
.iter()
.map(|d| d.column_id.get_id() as usize)
.collect_vec();
let schema = Schema::new(column_descs.iter().map(Into::into).collect_vec());
Ok(Box::new(SysRowSeqScanExecutor::new(
table_id,
schema,
column_indices,
source.plan_node().get_identity().clone(),
sys_catalog_reader,
)))
}
}
impl Executor for SysRowSeqScanExecutor {
fn schema(&self) -> &Schema {
&self.schema
}
fn identity(&self) -> &str {
&self.identity
}
fn execute(self: Box<Self>) -> BoxedDataChunkStream {
self.do_execute()
}
}
impl SysRowSeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_execute(self: Box<Self>) {
#[for_await]
for chunk in self.sys_catalog_reader.read_table(self.table_id) {
let chunk = chunk.map_err(BatchError::SystemTable)?;
yield chunk.project(&self.column_indices);
}
}
}