risingwave_frontend/optimizer/plan_visitor/
distributed_dml_visitor.rs1use std::rc::Rc;
16
17use risingwave_connector::source::ConnectorProperties;
18
19use super::{BatchPlanVisitor, DefaultBehavior, Merge};
20use crate::catalog::source_catalog::SourceCatalog;
21use crate::optimizer::plan_node::{BatchPlanRef as PlanRef, BatchSource};
22use crate::optimizer::plan_visitor::PlanVisitor;
23
24#[derive(Debug, Clone, Default)]
25pub struct DistributedDmlVisitor {}
26
27impl DistributedDmlVisitor {
28 pub fn dml_should_run_in_distributed(plan: PlanRef) -> bool {
29 if plan
30 .ctx()
31 .session_ctx()
32 .config()
33 .batch_enable_distributed_dml()
34 {
35 return true;
36 }
37 let mut visitor = DistributedDmlVisitor {};
38 visitor.visit(plan)
39 }
40
41 fn is_iceberg_source(source_catalog: &Rc<SourceCatalog>) -> bool {
42 let property = ConnectorProperties::extract(source_catalog.with_properties.clone(), false);
43 if let Ok(property) = property {
44 matches!(property, ConnectorProperties::Iceberg(_))
45 } else {
46 false
47 }
48 }
49}
50
51impl BatchPlanVisitor for DistributedDmlVisitor {
52 type Result = bool;
53
54 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
55
56 fn default_behavior() -> Self::DefaultBehavior {
57 Merge(|a, b| a | b)
58 }
59
60 fn visit_batch_source(&mut self, batch_source: &BatchSource) -> bool {
61 if let Some(source_catalog) = &batch_source.core.catalog {
62 Self::is_iceberg_source(source_catalog)
63 } else {
64 false
65 }
66 }
67}