risingwave_frontend/optimizer/plan_visitor/
distributed_dml_visitor.rs1use std::rc::Rc;
16
17use risingwave_connector::source::ConnectorProperties;
18
19use super::{DefaultBehavior, Merge};
20use crate::PlanRef;
21use crate::catalog::source_catalog::SourceCatalog;
22use crate::optimizer::plan_node::{BatchSource, LogicalSource, StreamSource};
23use crate::optimizer::plan_visitor::PlanVisitor;
24
25#[derive(Debug, Clone, Default)]
26pub struct DistributedDmlVisitor {}
27
28impl DistributedDmlVisitor {
29 pub fn dml_should_run_in_distributed(plan: PlanRef) -> bool {
30 if plan
31 .ctx()
32 .session_ctx()
33 .config()
34 .batch_enable_distributed_dml()
35 {
36 return true;
37 }
38 let mut visitor = DistributedDmlVisitor {};
39 visitor.visit(plan)
40 }
41
42 fn is_iceberg_source(source_catalog: &Rc<SourceCatalog>) -> bool {
43 let property = ConnectorProperties::extract(source_catalog.with_properties.clone(), false);
44 if let Ok(property) = property {
45 matches!(property, ConnectorProperties::Iceberg(_))
46 } else {
47 false
48 }
49 }
50}
51
52impl PlanVisitor for DistributedDmlVisitor {
53 type Result = bool;
54
55 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
56
57 fn default_behavior() -> Self::DefaultBehavior {
58 Merge(|a, b| a | b)
59 }
60
61 fn visit_batch_source(&mut self, batch_source: &BatchSource) -> bool {
62 if let Some(source_catalog) = &batch_source.core.catalog {
63 Self::is_iceberg_source(source_catalog)
64 } else {
65 false
66 }
67 }
68
69 fn visit_logical_source(&mut self, logical_source: &LogicalSource) -> bool {
70 if let Some(source_catalog) = &logical_source.core.catalog {
71 Self::is_iceberg_source(source_catalog)
72 } else {
73 false
74 }
75 }
76
77 fn visit_stream_source(&mut self, stream_source: &StreamSource) -> bool {
78 if let Some(source_catalog) = &stream_source.core.catalog {
79 Self::is_iceberg_source(source_catalog)
80 } else {
81 false
82 }
83 }
84}