risingwave_frontend/optimizer/property/stream_kind.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17/// The kind of the changelog stream output by a stream operator.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
19pub enum StreamKind {
20 /// The stream contains only `Insert` operations.
21 AppendOnly,
22
23 /// The stream contains `Insert`, `Delete`, `UpdateDelete`, and `UpdateInsert` operations.
24 ///
25 /// When a row is going to be updated or deleted, a `Delete` or `UpdateDelete` record
26 /// containing the complete old value will be emitted first, before the new value is emitted
27 /// as an `Insert` or `UpdateInsert` record.
28 Retract,
29
30 /// The stream contains `Insert` and `Delete` operations.
31 /// When a row is going to be updated, only the new value is emitted as an `Insert` record.
32 /// When a row is going to be deleted, an incomplete `Delete` record may be emitted, where
33 /// only the primary key columns are guaranteed to be set.
34 ///
35 /// Stateful operators typically can not process such streams correctly. It must be converted
36 /// to `Retract` before being sent to stateful operators in this case.
37 Upsert,
38}
39
40impl Display for StreamKind {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(
43 f,
44 "{}",
45 match self {
46 Self::AppendOnly => "append-only",
47 Self::Retract => "retract",
48 Self::Upsert => "upsert",
49 }
50 )
51 }
52}
53
54impl StreamKind {
55 /// Returns `true` if it's [`StreamKind::AppendOnly`].
56 pub fn is_append_only(self) -> bool {
57 matches!(self, Self::AppendOnly)
58 }
59
60 /// Returns the stream kind representing the merge (union) of the two.
61 ///
62 /// Note that there should be no conflict on the stream key between the two streams,
63 /// otherwise it will result in an "inconsistent" stream.
64 pub fn merge(self, other: Self) -> Self {
65 self.max(other)
66 }
67}
68
69/// Reject upsert stream as input.
70macro_rules! reject_upsert_input {
71 ($input:expr) => {
72 reject_upsert_input!(
73 $input,
74 std::any::type_name::<Self>().split("::").last().unwrap()
75 )
76 };
77
78 ($input:expr, $curr:expr) => {{
79 use crate::optimizer::plan_node::Explain;
80 use crate::optimizer::property::StreamKind;
81
82 let kind = $input.stream_kind();
83 if let StreamKind::Upsert = kind {
84 risingwave_common::bail!(
85 "upsert stream is not supported as input of {}, plan:\n{}",
86 $curr,
87 $input.explain_to_string()
88 );
89 }
90 kind
91 }};
92}
93pub(crate) use reject_upsert_input;