risingwave_frontend/optimizer/plan_node/
stream_fs_fetch.rsuse std::rc::Rc;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{PbStreamFsFetch, StreamFsFetchNode};
use super::stream::prelude::*;
use super::{PlanBase, PlanRef, PlanTreeNodeUnary};
use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode};
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamFsFetch {
pub base: PlanBase<Stream>,
input: PlanRef,
core: generic::Source,
}
impl PlanTreeNodeUnary for StreamFsFetch {
fn input(&self) -> PlanRef {
self.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.core.clone())
}
}
impl_plan_tree_node_for_unary! { StreamFsFetch }
impl StreamFsFetch {
pub fn new(input: PlanRef, source: generic::Source) -> Self {
let base = PlanBase::new_stream_with_core(
&source,
Distribution::SomeShard,
source.catalog.as_ref().map_or(true, |s| s.append_only),
false,
FixedBitSet::with_capacity(source.column_catalog.len()),
MonotonicityMap::new(), );
Self {
base,
input,
core: source,
}
}
fn get_columns(&self) -> Vec<&str> {
self.core
.column_catalog
.iter()
.map(|column| column.name())
.collect()
}
pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
self.core.catalog.clone()
}
}
impl Distill for StreamFsFetch {
fn distill<'a>(&self) -> XmlNode<'a> {
let columns = self
.get_columns()
.iter()
.map(|ele| Pretty::from(ele.to_string()))
.collect();
let col = Pretty::Array(columns);
childless_record("StreamFsFetch", vec![("columns", col)])
}
}
impl ExprRewritable for StreamFsFetch {}
impl ExprVisitable for StreamFsFetch {}
impl StreamNode for StreamFsFetch {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
let source_catalog = self.source_catalog();
let source_inner = source_catalog.map(|source_catalog| {
let (with_properties, secret_refs) =
source_catalog.with_properties.clone().into_parts();
PbStreamFsFetch {
source_id: source_catalog.id,
source_name: source_catalog.name.clone(),
state_table: Some(
generic::Source::infer_internal_table_catalog(true)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
info: Some(source_catalog.info.clone()),
row_id_index: self.core.row_id_index.map(|index| index as _),
columns: self
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: source_catalog.rate_limit,
secret_refs,
}
});
NodeBody::StreamFsFetch(StreamFsFetchNode {
node_inner: source_inner,
})
}
}