risingwave_frontend/optimizer/property/
stream_kind.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use risingwave_pb::stream_plan::stream_node::PbStreamKind;
18use static_assertions::const_assert_eq;
19
20/// The kind of the changelog stream output by a stream operator.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
22pub enum StreamKind {
23    /// The stream contains `Insert`, `Delete`, `UpdateDelete`, and `UpdateInsert` operations.
24    ///
25    /// When a row is going to be updated or deleted, a `Delete` or `UpdateDelete` record
26    /// containing the complete old value will be emitted first, before the new value is emitted
27    /// as an `Insert` or `UpdateInsert` record.
28    Retract,
29
30    /// The stream contains only `Insert` operations.
31    AppendOnly,
32
33    /// The stream contains `Insert` and `Delete` operations.
34    /// When a row is going to be updated, only the new value is emitted as an `Insert` record.
35    /// When a row is going to be deleted, an incomplete `Delete` record may be emitted, where
36    /// only the primary key columns are guaranteed to be set.
37    ///
38    /// Stateful operators typically can not process such streams correctly. It must be converted
39    /// to `Retract` before being sent to stateful operators in this case.
40    Upsert,
41}
42
43impl Display for StreamKind {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(
46            f,
47            "{}",
48            match self {
49                Self::Retract => "retract",
50                Self::AppendOnly => "append-only",
51                Self::Upsert => "upsert",
52            }
53        )
54    }
55}
56
57// Although there's a `to_protobuf` method, we have no way to avoid calling `as i32` to fit
58// `StreamKind` to the protobuf enum. We ensure their values are the same here for safety.
59const_assert_eq!(StreamKind::Retract as i32, PbStreamKind::Retract as i32);
60const_assert_eq!(
61    StreamKind::AppendOnly as i32,
62    PbStreamKind::AppendOnly as i32
63);
64const_assert_eq!(StreamKind::Upsert as i32, PbStreamKind::Upsert as i32);
65
66impl StreamKind {
67    /// Returns `true` if it's [`StreamKind::Retract`].
68    pub fn is_retract(self) -> bool {
69        matches!(self, Self::Retract)
70    }
71
72    /// Returns `true` if it's [`StreamKind::AppendOnly`].
73    pub fn is_append_only(self) -> bool {
74        matches!(self, Self::AppendOnly)
75    }
76
77    /// Returns `true` if it's [`StreamKind::Upsert`].
78    pub fn is_upsert(self) -> bool {
79        matches!(self, Self::Upsert)
80    }
81
82    /// Returns the stream kind representing the merge (union) of the two.
83    ///
84    /// Note that there should be no conflict on the stream key between the two streams,
85    /// otherwise it will result in an "inconsistent" stream.
86    pub fn merge(self, other: Self) -> Self {
87        let any = |kind| self == kind || other == kind;
88
89        if any(Self::Upsert) {
90            Self::Upsert
91        } else if any(Self::Retract) {
92            Self::Retract
93        } else {
94            Self::AppendOnly
95        }
96    }
97
98    /// Converts the stream kind to the protobuf representation.
99    pub fn to_protobuf(self) -> PbStreamKind {
100        match self {
101            Self::Retract => PbStreamKind::Retract,
102            Self::AppendOnly => PbStreamKind::AppendOnly,
103            Self::Upsert => PbStreamKind::Upsert,
104        }
105    }
106}
107
108/// Reject upsert stream as input.
109macro_rules! reject_upsert_input {
110    ($input:expr) => {
111        reject_upsert_input!(
112            $input,
113            std::any::type_name::<Self>().split("::").last().unwrap()
114        )
115    };
116
117    ($input:expr, $curr:expr) => {{
118        use crate::optimizer::plan_node::Explain;
119        use crate::optimizer::property::StreamKind;
120
121        let kind = $input.stream_kind();
122        if let StreamKind::Upsert = kind {
123            risingwave_common::bail!(
124                "upsert stream is not supported as input of {}, plan:\n{}",
125                $curr,
126                $input.explain_to_string()
127            );
128        }
129        kind
130    }};
131}
132pub(crate) use reject_upsert_input;