use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::Field;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;
use super::stream::prelude::*;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef};
use crate::handler::create_source::debezium_cdc_source_schema;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::scheduler::SchedulerResult;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::{Explain, TableCatalog};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamCdcTableScan {
pub base: PlanBase<Stream>,
core: generic::CdcScan,
}
impl StreamCdcTableScan {
pub fn new(core: generic::CdcScan) -> Self {
let distribution = Distribution::SomeShard;
let base = PlanBase::new_stream_with_core(
&core,
distribution,
core.append_only(),
false,
core.watermark_columns(),
core.columns_monotonicity(),
);
Self { base, core }
}
pub fn table_name(&self) -> &str {
&self.core.table_name
}
pub fn core(&self) -> &generic::CdcScan {
&self.core
}
pub fn build_backfill_state_catalog(
&self,
state: &mut BuildFragmentGraphState,
) -> TableCatalog {
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();
catalog_builder.add_column(&Field::with_name(DataType::Varchar, "split_id"));
catalog_builder.add_order_column(0, OrderType::ascending());
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
catalog_builder.add_column(&Field::with_name(DataType::Jsonb, "cdc_offset"));
catalog_builder
.build(vec![], 1)
.with_id(state.gen_table_id_wrapped())
}
}
impl_plan_tree_node_for_leaf! { StreamCdcTableScan }
impl Distill for StreamCdcTableScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(4);
vec.push(("table", Pretty::from(self.core.table_name.clone())));
vec.push(("columns", self.core.columns_pretty(verbose)));
if verbose {
let pk = IndicesDisplay {
indices: self.stream_key().unwrap_or_default(),
schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
let dist = Pretty::display(&DistributionDisplay {
distribution: self.distribution(),
input_schema: self.base.schema(),
});
vec.push(("dist", dist));
}
childless_record("StreamCdcTableScan", vec)
}
}
impl StreamNode for StreamCdcTableScan {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
unreachable!("stream scan cannot be converted into a prost body -- call `adhoc_to_stream_prost` instead.")
}
}
impl StreamCdcTableScan {
pub fn adhoc_to_stream_prost(
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<PbStreamNode> {
use risingwave_pb::stream_plan::*;
let stream_key = self
.stream_key()
.unwrap_or_else(|| {
panic!(
"should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()
)
})
.iter()
.map(|x| *x as u32)
.collect_vec();
let cdc_source_schema = {
let columns = debezium_cdc_source_schema();
columns
.into_iter()
.map(|c| Field::from(c.column_desc).to_prost())
.collect_vec()
};
let catalog = self
.build_backfill_state_catalog(state)
.to_internal_table_prost();
let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;
let filter_expr =
Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());
let filter_operator_id = self.core.ctx.next_plan_node_id();
let filter_stream_node = StreamNode {
operator_id: filter_operator_id.0 as _,
input: vec![
PbStreamNode {
node_body: Some(PbNodeBody::Merge(Default::default())),
identity: "Upstream".into(),
fields: cdc_source_schema.clone(),
stream_key: vec![], ..Default::default()
},
],
stream_key: vec![], append_only: true,
identity: "StreamCdcFilter".to_string(),
fields: cdc_source_schema.clone(),
node_body: Some(PbNodeBody::CdcFilter(CdcFilterNode {
search_condition: Some(filter_expr.to_expr_proto()),
upstream_source_id,
})),
};
let exchange_operator_id = self.core.ctx.next_plan_node_id();
let exchange_stream_node = StreamNode {
operator_id: exchange_operator_id.0 as _,
input: vec![filter_stream_node],
stream_key: vec![], append_only: true,
identity: "Exchange".to_string(),
fields: cdc_source_schema.clone(),
node_body: Some(PbNodeBody::Exchange(ExchangeNode {
strategy: Some(DispatchStrategy {
r#type: DispatcherType::Simple as _,
dist_key_indices: vec![], output_indices: (0..cdc_source_schema.len() as u32).collect(),
}),
})),
};
let upstream_column_ids = self
.core
.output_and_pk_column_ids()
.iter()
.map(ColumnId::get_id)
.collect_vec();
let output_indices = self
.core
.output_column_ids()
.iter()
.map(|i| {
upstream_column_ids
.iter()
.position(|&x| x == i.get_id())
.unwrap() as u32
})
.collect_vec();
tracing::debug!(
output_column_ids=?self.core.output_column_ids(),
?upstream_column_ids,
?output_indices,
"stream cdc table scan output indices"
);
let options = self.core.options.to_proto();
let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode {
table_id: upstream_source_id,
upstream_column_ids,
output_indices,
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
disable_backfill: options.disable_backfill,
options: Some(options),
});
Ok(PbStreamNode {
fields: self.schema().to_prost(),
input: vec![exchange_stream_node],
node_body: Some(stream_scan_body),
stream_key,
operator_id: self.base.id().0 as u64,
identity: self.distill_to_string(),
append_only: self.append_only(),
})
}
pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
FunctionCall::new(
ExprType::Equal,
vec![
InputRef::new(2, DataType::Varchar).into(),
ExprImpl::literal_varchar(cdc_table_name.into()),
],
)
.unwrap()
.into()
}
}
impl ExprRewritable for StreamCdcTableScan {
fn has_rewritable_expr(&self) -> bool {
true
}
fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core).into()
}
}
impl ExprVisitable for StreamCdcTableScan {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.core.visit_exprs(v);
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{JsonbVal, ScalarImpl};
use super::*;
#[tokio::test]
async fn test_cdc_filter_expr() {
let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
let row1 = OwnedRow::new(vec![
Some(t1_json.into()),
Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
Some("public.t2".into()),
]);
let row2 = OwnedRow::new(vec![
Some(t2_json.into()),
Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
Some("abs.t2".into()),
]);
let row3 = OwnedRow::new(vec![
Some(trx_json.into()),
Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
Some("public.t2".into()),
]);
let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
assert_eq!(
filter_expr.eval_row(&row1).await.unwrap(),
Some(ScalarImpl::Bool(true))
);
assert_eq!(
filter_expr.eval_row(&row2).await.unwrap(),
Some(ScalarImpl::Bool(false))
);
assert_eq!(
filter_expr.eval_row(&row3).await.unwrap(),
Some(ScalarImpl::Bool(true))
)
}
}