risingwave_frontend/optimizer/plan_visitor/
distributed_dml_visitor.rsuse std::rc::Rc;
use risingwave_connector::source::ConnectorProperties;
use super::{DefaultBehavior, Merge};
use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::{BatchSource, LogicalSource, StreamSource};
use crate::optimizer::plan_visitor::PlanVisitor;
use crate::PlanRef;
#[derive(Debug, Clone, Default)]
pub struct DistributedDmlVisitor {}
impl DistributedDmlVisitor {
pub fn dml_should_run_in_distributed(plan: PlanRef) -> bool {
if plan
.ctx()
.session_ctx()
.config()
.batch_enable_distributed_dml()
{
return true;
}
let mut visitor = DistributedDmlVisitor {};
visitor.visit(plan)
}
fn is_iceberg_source(source_catalog: &Rc<SourceCatalog>) -> bool {
let property = ConnectorProperties::extract(source_catalog.with_properties.clone(), false);
if let Ok(property) = property {
matches!(property, ConnectorProperties::Iceberg(_))
} else {
false
}
}
}
impl PlanVisitor for DistributedDmlVisitor {
type Result = bool;
type DefaultBehavior = impl DefaultBehavior<Self::Result>;
fn default_behavior() -> Self::DefaultBehavior {
Merge(|a, b| a | b)
}
fn visit_batch_source(&mut self, batch_source: &BatchSource) -> bool {
if let Some(source_catalog) = &batch_source.core.catalog {
Self::is_iceberg_source(source_catalog)
} else {
false
}
}
fn visit_logical_source(&mut self, logical_source: &LogicalSource) -> bool {
if let Some(source_catalog) = &logical_source.core.catalog {
Self::is_iceberg_source(source_catalog)
} else {
false
}
}
fn visit_stream_source(&mut self, stream_source: &StreamSource) -> bool {
if let Some(source_catalog) = &stream_source.core.catalog {
Self::is_iceberg_source(source_catalog)
} else {
false
}
}
}