Skip to main content

risingwave_frontend/optimizer/plan_node/
convert.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::FieldDisplay;
18use risingwave_pb::stream_plan::StreamScanType;
19
20use super::*;
21use crate::optimizer::property::RequiredDist;
22
23/// `ToStream` converts a logical plan node to streaming physical node
24/// with an optional required distribution.
25///
26/// when implement this trait you can choose the two ways
27/// - Implement `to_stream` and use the default implementation of `to_stream_with_dist_required`
28/// - Or, if the required distribution is given, there will be a better plan. For example a hash
29///   join with hash-key(a,b) and the plan is required hash-distributed by (a,b,c). you can
30///   implement `to_stream_with_dist_required`, and implement `to_stream` with
31///   `to_stream_with_dist_required(RequiredDist::Any)`. you can see [`LogicalProject`] as an
32///   example.
33pub trait ToStream {
34    /// `logical_rewrite_for_stream` will rewrite the logical node, and return (`new_plan_node`,
35    /// `col_mapping`), the `col_mapping` is for original columns have been changed into some other
36    /// position.
37    ///
38    /// Now it is used to:
39    /// 1. ensure every plan node's output having pk column
40    /// 2. add `row_count`() in every Agg
41    fn logical_rewrite_for_stream(
42        &self,
43        ctx: &mut RewriteStreamContext,
44    ) -> Result<(LogicalPlanRef, ColIndexMapping)>;
45
46    /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
47    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef>;
48
49    /// convert the plan to streaming physical plan and satisfy the required distribution
50    fn to_stream_with_dist_required(
51        &self,
52        required_dist: &RequiredDist,
53        ctx: &mut ToStreamContext,
54    ) -> Result<StreamPlanRef> {
55        let ret = self.to_stream(ctx)?;
56        required_dist.streaming_enforce_if_not_satisfies(ret)
57    }
58
59    fn try_better_locality(&self, _columns: &[usize]) -> Option<LogicalPlanRef> {
60        None
61    }
62}
63
64/// Try to enforce the locality requirement on the given columns.
65/// If a better plan can be found, return the better plan.
66/// If no better plan can be found, and locality backfill is enabled, wrap the plan
67/// with `LogicalLocalityProvider`.
68/// Otherwise, return the plan as is.
69pub fn try_enforce_locality_requirement(plan: LogicalPlanRef, columns: &[usize]) -> LogicalPlanRef {
70    assert!(!columns.is_empty());
71    if let Some(better_plan) = plan.try_better_locality(columns) {
72        better_plan
73    } else if plan.ctx().session_ctx().config().enable_locality_backfill() {
74        LogicalLocalityProvider::new(plan, columns.to_owned()).into()
75    } else {
76        // TODO: remove this when locality backfill is enabled by default
77        plan.ctx().inc_missed_locality_providers();
78        plan
79    }
80}
81
82pub fn stream_enforce_eowc_requirement(
83    ctx: OptimizerContextRef,
84    plan: StreamPlanRef,
85    emit_on_window_close: bool,
86) -> Result<StreamPlanRef> {
87    if emit_on_window_close && !plan.emit_on_window_close() {
88        let watermark_groups = plan.watermark_columns().grouped();
89        let n_watermark_groups = watermark_groups.len();
90        if n_watermark_groups == 0 {
91            Err(ErrorCode::NotSupported(
92                "The query cannot be executed in Emit-On-Window-Close mode.".to_owned(),
93                "Try define a watermark column in the source, or avoid aggregation without GROUP BY".to_owned(),
94            )
95            .into())
96        } else {
97            let first_watermark_group = watermark_groups.values().next().unwrap();
98            let watermark_col_idx = first_watermark_group.indices().next().unwrap();
99            if n_watermark_groups > 1 {
100                ctx.warn_to_user(format!(
101                    "There are multiple unrelated watermark columns in the query, the first one `{}` is used.",
102                    FieldDisplay(&plan.schema()[watermark_col_idx])
103                ));
104            }
105            Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
106        }
107    } else {
108        Ok(plan)
109    }
110}
111
112#[derive(Debug, Clone)]
113pub struct RewriteStreamContext {
114    share_rewrite_map: HashMap<PlanNodeId, (LogicalPlanRef, ColIndexMapping)>,
115    // Snapshot backfill needs upstream table primary-key semantics during logical rewrite
116    // so operators above `LogicalScan` can preserve hidden primary-key columns before
117    // `StreamTableScan` is built. Other backfill types keep logical stream-key semantics.
118    backfill_type: BackfillType,
119}
120
121impl RewriteStreamContext {
122    pub fn new_with_backfill_type(backfill_type: BackfillType) -> Self {
123        Self {
124            share_rewrite_map: HashMap::new(),
125            backfill_type,
126        }
127    }
128
129    pub fn backfill_type(&self) -> BackfillType {
130        self.backfill_type
131    }
132
133    pub fn add_rewrite_result(
134        &mut self,
135        plan_node_id: PlanNodeId,
136        plan_ref: LogicalPlanRef,
137        col_change: ColIndexMapping,
138    ) {
139        let prev = self
140            .share_rewrite_map
141            .insert(plan_node_id, (plan_ref, col_change));
142        assert!(prev.is_none());
143    }
144
145    pub fn get_rewrite_result(
146        &self,
147        plan_node_id: PlanNodeId,
148    ) -> Option<&(LogicalPlanRef, ColIndexMapping)> {
149        self.share_rewrite_map.get(&plan_node_id)
150    }
151}
152
153#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
154pub enum BackfillType {
155    Replicated,
156    /// Frontend-only variant for snapshot-free sinks. It is serialized as
157    /// `StreamScanType::UpstreamOnly`, but derives an upsert stream kind.
158    UpstreamOnlySink,
159    ArrangementBackfill,
160    SnapshotBackfill,
161    /// Frontend-only variant for sinks created with `since_timestamp`.
162    /// It is serialized as `StreamScanType::SnapshotBackfill`, but derives
163    /// the same upsert stream kind as upstream-only sinks.
164    SnapshotBackfillSinceTimestamp,
165}
166
167impl BackfillType {
168    pub fn without_snapshot(self) -> bool {
169        matches!(
170            self,
171            BackfillType::UpstreamOnlySink | BackfillType::SnapshotBackfillSinceTimestamp
172        )
173    }
174
175    pub fn is_snapshot_backfill(self) -> bool {
176        matches!(
177            self,
178            BackfillType::SnapshotBackfill | BackfillType::SnapshotBackfillSinceTimestamp
179        )
180    }
181
182    pub fn to_stream_scan_type(self, is_cross_db: bool) -> StreamScanType {
183        if is_cross_db {
184            return StreamScanType::CrossDbSnapshotBackfill;
185        }
186
187        match self {
188            BackfillType::Replicated | BackfillType::UpstreamOnlySink => {
189                StreamScanType::UpstreamOnly
190            }
191            BackfillType::ArrangementBackfill => StreamScanType::ArrangementBackfill,
192            BackfillType::SnapshotBackfill | BackfillType::SnapshotBackfillSinceTimestamp => {
193                StreamScanType::SnapshotBackfill
194            }
195        }
196    }
197}
198
199#[derive(Debug, Clone)]
200pub struct ToStreamContext {
201    share_to_stream_map: HashMap<PlanNodeId, StreamPlanRef>,
202    emit_on_window_close: bool,
203    backfill_type: BackfillType,
204}
205
206impl ToStreamContext {
207    pub fn new_with_backfill_type(emit_on_window_close: bool, backfill_type: BackfillType) -> Self {
208        Self {
209            share_to_stream_map: HashMap::new(),
210            emit_on_window_close,
211            backfill_type,
212        }
213    }
214
215    pub fn backfill_type(&self) -> BackfillType {
216        self.backfill_type
217    }
218
219    pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: StreamPlanRef) {
220        self.share_to_stream_map
221            .try_insert(plan_node_id, plan_ref)
222            .unwrap();
223    }
224
225    pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&StreamPlanRef> {
226        self.share_to_stream_map.get(&plan_node_id)
227    }
228
229    pub fn emit_on_window_close(&self) -> bool {
230        self.emit_on_window_close
231    }
232}
233
234/// `ToBatch` allows to convert a logical plan node to batch physical node
235/// with an optional required order.
236///
237/// The generated plan has single distribution and doesn't have any exchange nodes inserted.
238/// Use either [`ToLocalBatch`] or [`ToDistributedBatch`] after `ToBatch` to get a distributed plan.
239///
240/// To implement this trait you can choose one of the two ways:
241/// - Implement `to_batch` and use the default implementation of `to_batch_with_order_required`
242/// - Or, if a better plan can be generated when a required order is given, you can implement
243///   `to_batch_with_order_required`, and implement `to_batch` with
244///   `to_batch_with_order_required(&Order::any())`.
245pub trait ToBatch {
246    /// `to_batch` is equivalent to `to_batch_with_order_required(&Order::any())`
247    fn to_batch(&self) -> Result<BatchPlanRef>;
248    /// convert the plan to batch physical plan and satisfy the required Order
249    fn to_batch_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
250        let ret = self.to_batch()?;
251        required_order.enforce_if_not_satisfies(ret)
252    }
253}
254
255/// Converts a batch physical plan to local plan for local execution.
256///
257/// This is quite similar to `ToBatch`, but different in several ways. For example it converts
258/// scan to exchange + scan.
259pub trait ToLocalBatch {
260    fn to_local(&self) -> Result<BatchPlanRef>;
261
262    /// Convert the plan to batch local physical plan and satisfy the required Order
263    fn to_local_with_order_required(&self, required_order: &Order) -> Result<BatchPlanRef> {
264        let ret = self.to_local()?;
265        required_order.enforce_if_not_satisfies(ret)
266    }
267}
268
269/// `ToDistributedBatch` allows to convert a batch physical plan to distributed batch plan, by
270/// insert exchange node, with an optional required order and distributed.
271///
272/// To implement this trait you can choose one of the two ways:
273/// - Implement `to_distributed` and use the default implementation of
274///   `to_distributed_with_required`
275/// - Or, if a better plan can be generated when a required order is given, you can implement
276///   `to_distributed_with_required`, and implement `to_distributed` with
277///   `to_distributed_with_required(&Order::any(), &RequiredDist::Any)`
278pub trait ToDistributedBatch {
279    /// `to_distributed` is equivalent to `to_distributed_with_required(&Order::any(),
280    /// &RequiredDist::Any)`
281    fn to_distributed(&self) -> Result<BatchPlanRef>;
282    /// insert the exchange in batch physical plan to satisfy the required Distribution and Order.
283    fn to_distributed_with_required(
284        &self,
285        required_order: &Order,
286        required_dist: &RequiredDist,
287    ) -> Result<BatchPlanRef> {
288        let ret = self.to_distributed()?;
289        let ret = required_order.enforce_if_not_satisfies(ret)?;
290        required_dist.batch_enforce_if_not_satisfies(ret, required_order)
291    }
292}