risingwave_frontend/optimizer/property/
stream_kind.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17/// The kind of the changelog stream output by a stream operator.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
19pub enum StreamKind {
20    /// The stream contains only `Insert` operations.
21    AppendOnly,
22
23    /// The stream contains `Insert`, `Delete`, `UpdateDelete`, and `UpdateInsert` operations.
24    ///
25    /// When a row is going to be updated or deleted, a `Delete` or `UpdateDelete` record
26    /// containing the complete old value will be emitted first, before the new value is emitted
27    /// as an `Insert` or `UpdateInsert` record.
28    Retract,
29
30    /// The stream contains `Insert` and `Delete` operations.
31    /// When a row is going to be updated, only the new value is emitted as an `Insert` record.
32    /// When a row is going to be deleted, an incomplete `Delete` record may be emitted, where
33    /// only the primary key columns are guaranteed to be set.
34    ///
35    /// Stateful operators typically can not process such streams correctly. It must be converted
36    /// to `Retract` before being sent to stateful operators in this case.
37    Upsert,
38}
39
40impl Display for StreamKind {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(
43            f,
44            "{}",
45            match self {
46                Self::AppendOnly => "append-only",
47                Self::Retract => "retract",
48                Self::Upsert => "upsert",
49            }
50        )
51    }
52}
53
54impl StreamKind {
55    /// Returns `true` if it's [`StreamKind::AppendOnly`].
56    pub fn is_append_only(self) -> bool {
57        matches!(self, Self::AppendOnly)
58    }
59
60    /// Returns the stream kind representing the merge (union) of the two.
61    ///
62    /// Note that there should be no conflict on the stream key between the two streams,
63    /// otherwise it will result in an "inconsistent" stream.
64    pub fn merge(self, other: Self) -> Self {
65        self.max(other)
66    }
67}
68
69/// Reject upsert stream as input.
70macro_rules! reject_upsert_input {
71    ($input:expr) => {
72        reject_upsert_input!(
73            $input,
74            std::any::type_name::<Self>().split("::").last().unwrap()
75        )
76    };
77
78    ($input:expr, $curr:expr) => {{
79        use crate::optimizer::plan_node::Explain;
80        use crate::optimizer::property::StreamKind;
81
82        let kind = $input.stream_kind();
83        if let StreamKind::Upsert = kind {
84            risingwave_common::bail!(
85                "upsert stream is not supported as input of {}, plan:\n{}",
86                $curr,
87                $input.explain_to_string()
88            );
89        }
90        kind
91    }};
92}
93pub(crate) use reject_upsert_input;