risingwave_frontend/optimizer/property/
stream_kind.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use risingwave_pb::stream_plan::stream_node::PbStreamKind;
18use static_assertions::const_assert_eq;
19
20/// The kind of the changelog stream output by a stream operator.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
22pub enum StreamKind {
23    /// The stream contains `Insert`, `Delete`, `UpdateDelete`, and `UpdateInsert` operations.
24    ///
25    /// When a row is going to be updated or deleted, a `Delete` or `UpdateDelete` record
26    /// containing the complete old value will be emitted first, before the new value is emitted
27    /// as an `Insert` or `UpdateInsert` record.
28    Retract,
29
30    /// The stream contains only `Insert` operations.
31    AppendOnly,
32
33    /// The stream contains `Insert` and `Delete` operations.
34    /// When a row is going to be updated, only the new value is emitted as an `Insert` record.
35    /// When a row is going to be deleted, an incomplete `Delete` record may be emitted, where
36    /// only the primary key columns are guaranteed to be set.
37    ///
38    /// Stateful operators typically can not process such streams correctly. It must be converted
39    /// to `Retract` before being sent to stateful operators in this case.
40    Upsert,
41}
42
43impl Display for StreamKind {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(
46            f,
47            "{}",
48            match self {
49                Self::Retract => "retract",
50                Self::AppendOnly => "append-only",
51                Self::Upsert => "upsert",
52            }
53        )
54    }
55}
56
57// Although there's a `to_protobuf` method, we have no way to avoid calling `as i32` to fit
58// `StreamKind` to the protobuf enum. We ensure their values are the same here for safety.
59const_assert_eq!(StreamKind::Retract as i32, PbStreamKind::Retract as i32);
60const_assert_eq!(
61    StreamKind::AppendOnly as i32,
62    PbStreamKind::AppendOnly as i32
63);
64const_assert_eq!(StreamKind::Upsert as i32, PbStreamKind::Upsert as i32);
65
66impl StreamKind {
67    /// Returns `true` if it's [`StreamKind::AppendOnly`].
68    pub fn is_append_only(self) -> bool {
69        matches!(self, Self::AppendOnly)
70    }
71
72    /// Returns the stream kind representing the merge (union) of the two.
73    ///
74    /// Note that there should be no conflict on the stream key between the two streams,
75    /// otherwise it will result in an "inconsistent" stream.
76    pub fn merge(self, other: Self) -> Self {
77        let any = |kind| self == kind || other == kind;
78
79        if any(Self::Upsert) {
80            Self::Upsert
81        } else if any(Self::Retract) {
82            Self::Retract
83        } else {
84            Self::AppendOnly
85        }
86    }
87
88    /// Converts the stream kind to the protobuf representation.
89    pub fn to_protobuf(self) -> PbStreamKind {
90        match self {
91            Self::Retract => PbStreamKind::Retract,
92            Self::AppendOnly => PbStreamKind::AppendOnly,
93            Self::Upsert => PbStreamKind::Upsert,
94        }
95    }
96}
97
98/// Reject upsert stream as input.
99macro_rules! reject_upsert_input {
100    ($input:expr) => {
101        reject_upsert_input!(
102            $input,
103            std::any::type_name::<Self>().split("::").last().unwrap()
104        )
105    };
106
107    ($input:expr, $curr:expr) => {{
108        use crate::optimizer::plan_node::Explain;
109        use crate::optimizer::property::StreamKind;
110
111        let kind = $input.stream_kind();
112        if let StreamKind::Upsert = kind {
113            risingwave_common::bail!(
114                "upsert stream is not supported as input of {}, plan:\n{}",
115                $curr,
116                $input.explain_to_string()
117            );
118        }
119        kind
120    }};
121}
122pub(crate) use reject_upsert_input;