risingwave_frontend/optimizer/plan_node/
stream_source_scan.rsuse std::rc::Rc;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnCatalog, Field};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols;
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::PbStreamNode;
use super::stream::prelude::*;
use super::utils::TableCatalogBuilder;
use super::{PlanBase, PlanRef};
use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode};
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::{Explain, TableCatalog};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamSourceScan {
pub base: PlanBase<Stream>,
core: generic::Source,
}
impl_plan_tree_node_for_leaf! { StreamSourceScan }
impl StreamSourceScan {
pub fn new(mut core: generic::Source) -> Self {
if let Some(source_catalog) = &core.catalog
&& source_catalog.info.is_shared()
{
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&core.column_catalog,
&source_catalog.connector_name(),
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
core.column_catalog.push(ColumnCatalog::hidden(c));
}
}
}
let base = PlanBase::new_stream_with_core(
&core,
Distribution::SomeShard,
core.catalog.as_ref().map_or(true, |s| s.append_only),
false,
FixedBitSet::with_capacity(core.column_catalog.len()),
MonotonicityMap::new(),
);
Self { base, core }
}
fn get_columns(&self) -> Vec<&str> {
self.core
.column_catalog
.iter()
.map(|column| column.name())
.collect()
}
pub fn source_catalog(&self) -> Rc<SourceCatalog> {
self.core
.catalog
.clone()
.expect("source scan should have source cataglog")
}
pub fn infer_internal_table_catalog() -> TableCatalog {
let mut builder = TableCatalogBuilder::default();
let key = Field {
data_type: DataType::Varchar,
name: "partition_id".to_string(),
sub_fields: vec![],
type_name: "".to_string(),
};
let value = Field {
data_type: DataType::Jsonb,
name: "backfill_progress".to_string(),
sub_fields: vec![],
type_name: "".to_string(),
};
let ordered_col_idx = builder.add_column(&key);
builder.add_column(&value);
builder.add_order_column(ordered_col_idx, OrderType::ascending());
builder.build(vec![], 0)
}
pub fn adhoc_to_stream_prost(
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<PbStreamNode> {
use risingwave_pb::stream_plan::*;
let stream_key = self
.stream_key()
.unwrap_or_else(|| {
panic!(
"should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()
)
})
.iter()
.map(|x| *x as u32)
.collect_vec();
let source_catalog = self.source_catalog();
let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
let backfill = SourceBackfillNode {
upstream_source_id: source_catalog.id,
source_name: source_catalog.name.clone(),
state_table: Some(
Self::infer_internal_table_catalog()
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
info: Some(source_catalog.info.clone()),
row_id_index: self.core.row_id_index.map(|index| index as _),
columns: self
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
secret_refs,
};
let fields = self.schema().to_prost();
Ok(PbStreamNode {
fields: fields.clone(),
input: vec![
PbStreamNode {
node_body: Some(PbNodeBody::Merge(Default::default())),
identity: "Upstream".into(),
fields,
stream_key: vec![], ..Default::default()
},
],
node_body: Some(PbNodeBody::SourceBackfill(backfill)),
stream_key,
operator_id: self.base.id().0 as u64,
identity: self.distill_to_string(),
append_only: self.append_only(),
})
}
}
impl Distill for StreamSourceScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let columns = self
.get_columns()
.iter()
.map(|ele| Pretty::from(ele.to_string()))
.collect();
let col = Pretty::Array(columns);
childless_record("StreamSourceScan", vec![("columns", col)])
}
}
impl ExprRewritable for StreamSourceScan {}
impl ExprVisitable for StreamSourceScan {}
impl StreamNode for StreamSourceScan {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
unreachable!("stream source scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.")
}
}