risingwave_frontend/optimizer/property/stream_kind.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use risingwave_pb::stream_plan::stream_node::PbStreamKind;
18use static_assertions::const_assert_eq;
19
20/// The kind of the changelog stream output by a stream operator.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
22pub enum StreamKind {
23 /// The stream contains `Insert`, `Delete`, `UpdateDelete`, and `UpdateInsert` operations.
24 ///
25 /// When a row is going to be updated or deleted, a `Delete` or `UpdateDelete` record
26 /// containing the complete old value will be emitted first, before the new value is emitted
27 /// as an `Insert` or `UpdateInsert` record.
28 Retract,
29
30 /// The stream contains only `Insert` operations.
31 AppendOnly,
32
33 /// The stream contains `Insert` and `Delete` operations.
34 /// When a row is going to be updated, only the new value is emitted as an `Insert` record.
35 /// When a row is going to be deleted, an incomplete `Delete` record may be emitted, where
36 /// only the primary key columns are guaranteed to be set.
37 ///
38 /// Stateful operators typically can not process such streams correctly. It must be converted
39 /// to `Retract` before being sent to stateful operators in this case.
40 Upsert,
41}
42
43impl Display for StreamKind {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(
46 f,
47 "{}",
48 match self {
49 Self::Retract => "retract",
50 Self::AppendOnly => "append-only",
51 Self::Upsert => "upsert",
52 }
53 )
54 }
55}
56
57// Although there's a `to_protobuf` method, we have no way to avoid calling `as i32` to fit
58// `StreamKind` to the protobuf enum. We ensure their values are the same here for safety.
59const_assert_eq!(StreamKind::Retract as i32, PbStreamKind::Retract as i32);
60const_assert_eq!(
61 StreamKind::AppendOnly as i32,
62 PbStreamKind::AppendOnly as i32
63);
64const_assert_eq!(StreamKind::Upsert as i32, PbStreamKind::Upsert as i32);
65
66impl StreamKind {
67 /// Returns `true` if it's [`StreamKind::AppendOnly`].
68 pub fn is_append_only(self) -> bool {
69 matches!(self, Self::AppendOnly)
70 }
71
72 /// Returns the stream kind representing the merge (union) of the two.
73 ///
74 /// Note that there should be no conflict on the stream key between the two streams,
75 /// otherwise it will result in an "inconsistent" stream.
76 pub fn merge(self, other: Self) -> Self {
77 let any = |kind| self == kind || other == kind;
78
79 if any(Self::Upsert) {
80 Self::Upsert
81 } else if any(Self::Retract) {
82 Self::Retract
83 } else {
84 Self::AppendOnly
85 }
86 }
87
88 /// Converts the stream kind to the protobuf representation.
89 pub fn to_protobuf(self) -> PbStreamKind {
90 match self {
91 Self::Retract => PbStreamKind::Retract,
92 Self::AppendOnly => PbStreamKind::AppendOnly,
93 Self::Upsert => PbStreamKind::Upsert,
94 }
95 }
96}
97
98/// Reject upsert stream as input.
99macro_rules! reject_upsert_input {
100 ($input:expr) => {
101 reject_upsert_input!(
102 $input,
103 std::any::type_name::<Self>().split("::").last().unwrap()
104 )
105 };
106
107 ($input:expr, $curr:expr) => {{
108 use crate::optimizer::plan_node::Explain;
109 use crate::optimizer::property::StreamKind;
110
111 let kind = $input.stream_kind();
112 if let StreamKind::Upsert = kind {
113 risingwave_common::bail!(
114 "upsert stream is not supported as input of {}, plan:\n{}",
115 $curr,
116 $input.explain_to_string()
117 );
118 }
119 kind
120 }};
121}
122pub(crate) use reject_upsert_input;