risingwave_pb/
stream_plan.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct Dispatchers {
5    #[prost(message, repeated, tag = "1")]
6    pub dispatchers: ::prost::alloc::vec::Vec<Dispatcher>,
7}
8#[derive(prost_helpers::AnyPB)]
9#[derive(Clone, PartialEq, ::prost::Message)]
10pub struct AddMutation {
11    /// New dispatchers for each actor.
12    #[prost(map = "uint32, message", tag = "1")]
13    pub actor_dispatchers: ::std::collections::HashMap<u32, Dispatchers>,
14    /// All actors to be added (to the main connected component of the graph) in this update.
15    #[prost(uint32, repeated, tag = "3")]
16    pub added_actors: ::prost::alloc::vec::Vec<u32>,
17    /// We may embed a source change split mutation here.
18    /// `Source` and `SourceBackfill` are handled together here.
19    /// TODO: we may allow multiple mutations in a single barrier.
20    #[prost(map = "uint32, message", tag = "2")]
21    pub actor_splits: ::std::collections::HashMap<u32, super::source::ConnectorSplits>,
22    /// We may embed a pause mutation here.
23    /// TODO: we may allow multiple mutations in a single barrier.
24    #[prost(bool, tag = "4")]
25    pub pause: bool,
26    #[prost(message, repeated, tag = "5")]
27    pub subscriptions_to_add: ::prost::alloc::vec::Vec<SubscriptionUpstreamInfo>,
28}
29#[derive(prost_helpers::AnyPB)]
30#[derive(Clone, PartialEq, ::prost::Message)]
31pub struct StopMutation {
32    #[prost(uint32, repeated, tag = "1")]
33    pub actors: ::prost::alloc::vec::Vec<u32>,
34}
35#[derive(prost_helpers::AnyPB)]
36#[derive(Clone, PartialEq, ::prost::Message)]
37pub struct UpdateMutation {
38    /// Dispatcher updates.
39    #[prost(message, repeated, tag = "1")]
40    pub dispatcher_update: ::prost::alloc::vec::Vec<update_mutation::DispatcherUpdate>,
41    /// Merge updates.
42    #[prost(message, repeated, tag = "2")]
43    pub merge_update: ::prost::alloc::vec::Vec<update_mutation::MergeUpdate>,
44    /// Vnode bitmap updates for each actor.
45    #[prost(map = "uint32, message", tag = "3")]
46    pub actor_vnode_bitmap_update: ::std::collections::HashMap<
47        u32,
48        super::common::Buffer,
49    >,
50    /// All actors to be dropped in this update.
51    #[prost(uint32, repeated, tag = "4")]
52    pub dropped_actors: ::prost::alloc::vec::Vec<u32>,
53    /// Source updates.
54    /// `Source` and `SourceBackfill` are handled together here.
55    #[prost(map = "uint32, message", tag = "5")]
56    pub actor_splits: ::std::collections::HashMap<u32, super::source::ConnectorSplits>,
57    /// When modifying the Materialized View, we need to recreate the Dispatcher from the old upstream to the new TableFragment.
58    /// Consistent with the semantics in AddMutation.
59    #[prost(map = "uint32, message", tag = "6")]
60    pub actor_new_dispatchers: ::std::collections::HashMap<u32, Dispatchers>,
61}
62/// Nested message and enum types in `UpdateMutation`.
63pub mod update_mutation {
64    #[derive(prost_helpers::AnyPB)]
65    #[derive(Clone, PartialEq, ::prost::Message)]
66    pub struct DispatcherUpdate {
67        /// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
68        #[prost(uint32, tag = "1")]
69        pub actor_id: u32,
70        #[prost(uint64, tag = "2")]
71        pub dispatcher_id: u64,
72        /// The hash mapping for consistent hash.
73        /// For dispatcher types other than HASH, this is ignored.
74        #[prost(message, optional, tag = "3")]
75        pub hash_mapping: ::core::option::Option<super::ActorMapping>,
76        /// Added downstream actors.
77        #[prost(uint32, repeated, tag = "4")]
78        pub added_downstream_actor_id: ::prost::alloc::vec::Vec<u32>,
79        /// Removed downstream actors.
80        #[prost(uint32, repeated, tag = "5")]
81        pub removed_downstream_actor_id: ::prost::alloc::vec::Vec<u32>,
82    }
83    #[derive(prost_helpers::AnyPB)]
84    #[derive(Clone, PartialEq, ::prost::Message)]
85    pub struct MergeUpdate {
86        /// Merge executor can be uniquely identified by a combination of actor id and upstream fragment id.
87        #[prost(uint32, tag = "1")]
88        pub actor_id: u32,
89        #[prost(uint32, tag = "2")]
90        pub upstream_fragment_id: u32,
91        /// - For scaling, this is always `None`.
92        /// - For plan change, the upstream fragment will be changed to a new one, and this will be `Some`.
93        ///    In this case, all the upstream actors should be removed and replaced by the `new` ones.
94        #[prost(uint32, optional, tag = "5")]
95        pub new_upstream_fragment_id: ::core::option::Option<u32>,
96        /// Added upstream actors.
97        #[prost(uint32, repeated, tag = "3")]
98        pub added_upstream_actor_id: ::prost::alloc::vec::Vec<u32>,
99        /// Removed upstream actors.
100        /// Note: this is empty for replace job.
101        #[prost(uint32, repeated, tag = "4")]
102        pub removed_upstream_actor_id: ::prost::alloc::vec::Vec<u32>,
103    }
104}
105#[derive(prost_helpers::AnyPB)]
106#[derive(Clone, PartialEq, ::prost::Message)]
107pub struct SourceChangeSplitMutation {
108    /// `Source` and `SourceBackfill` are handled together here.
109    #[prost(map = "uint32, message", tag = "2")]
110    pub actor_splits: ::std::collections::HashMap<u32, super::source::ConnectorSplits>,
111}
112#[derive(prost_helpers::AnyPB)]
113#[derive(Clone, Copy, PartialEq, ::prost::Message)]
114pub struct PauseMutation {}
115#[derive(prost_helpers::AnyPB)]
116#[derive(Clone, Copy, PartialEq, ::prost::Message)]
117pub struct ResumeMutation {}
118#[derive(prost_helpers::AnyPB)]
119#[derive(Clone, PartialEq, ::prost::Message)]
120pub struct ThrottleMutation {
121    #[prost(map = "uint32, message", tag = "1")]
122    pub actor_throttle: ::std::collections::HashMap<u32, throttle_mutation::RateLimit>,
123}
124/// Nested message and enum types in `ThrottleMutation`.
125pub mod throttle_mutation {
126    #[derive(prost_helpers::AnyPB)]
127    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
128    pub struct RateLimit {
129        #[prost(uint32, optional, tag = "1")]
130        pub rate_limit: ::core::option::Option<u32>,
131    }
132}
133#[derive(prost_helpers::AnyPB)]
134#[derive(Clone, PartialEq, ::prost::Message)]
135pub struct CombinedMutation {
136    #[prost(message, repeated, tag = "1")]
137    pub mutations: ::prost::alloc::vec::Vec<BarrierMutation>,
138}
139#[derive(prost_helpers::AnyPB)]
140#[derive(Clone, Copy, PartialEq, ::prost::Message)]
141pub struct SubscriptionUpstreamInfo {
142    /// can either be subscription_id or table_id of creating TableFragments
143    #[prost(uint32, tag = "1")]
144    pub subscriber_id: u32,
145    #[prost(uint32, tag = "2")]
146    pub upstream_mv_table_id: u32,
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct DropSubscriptionsMutation {
151    #[prost(message, repeated, tag = "1")]
152    pub info: ::prost::alloc::vec::Vec<SubscriptionUpstreamInfo>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, ::prost::Message)]
156pub struct BarrierMutation {
157    #[prost(
158        oneof = "barrier_mutation::Mutation",
159        tags = "3, 4, 5, 6, 7, 8, 10, 12, 100"
160    )]
161    pub mutation: ::core::option::Option<barrier_mutation::Mutation>,
162}
163/// Nested message and enum types in `BarrierMutation`.
164pub mod barrier_mutation {
165    #[derive(prost_helpers::AnyPB)]
166    #[derive(Clone, PartialEq, ::prost::Oneof)]
167    pub enum Mutation {
168        /// Add new dispatchers to some actors, used for creating materialized views.
169        #[prost(message, tag = "3")]
170        Add(super::AddMutation),
171        /// Stop a set of actors, used for dropping materialized views. Empty dispatchers will be
172        /// automatically removed.
173        #[prost(message, tag = "4")]
174        Stop(super::StopMutation),
175        /// Update outputs and hash mappings for some dispatchers, used for scaling and replace table.
176        #[prost(message, tag = "5")]
177        Update(super::UpdateMutation),
178        /// Change the split of some sources.
179        #[prost(message, tag = "6")]
180        Splits(super::SourceChangeSplitMutation),
181        /// Pause the dataflow of the whole streaming graph, only used for scaling.
182        #[prost(message, tag = "7")]
183        Pause(super::PauseMutation),
184        /// Resume the dataflow of the whole streaming graph, only used for scaling.
185        #[prost(message, tag = "8")]
186        Resume(super::ResumeMutation),
187        /// Throttle specific source exec or backfill exec.
188        #[prost(message, tag = "10")]
189        Throttle(super::ThrottleMutation),
190        /// Drop subscription on mv
191        #[prost(message, tag = "12")]
192        DropSubscriptions(super::DropSubscriptionsMutation),
193        /// Combined mutation.
194        /// Currently, it can only be Add & Update, which is for sink into table.
195        #[prost(message, tag = "100")]
196        Combined(super::CombinedMutation),
197    }
198}
199#[derive(prost_helpers::AnyPB)]
200#[derive(Clone, PartialEq, ::prost::Message)]
201pub struct Barrier {
202    #[prost(message, optional, tag = "1")]
203    pub epoch: ::core::option::Option<super::data::Epoch>,
204    #[prost(message, optional, tag = "3")]
205    pub mutation: ::core::option::Option<BarrierMutation>,
206    /// Used for tracing.
207    #[prost(map = "string, string", tag = "2")]
208    pub tracing_context: ::std::collections::HashMap<
209        ::prost::alloc::string::String,
210        ::prost::alloc::string::String,
211    >,
212    /// The kind of the barrier.
213    #[prost(enumeration = "barrier::BarrierKind", tag = "9")]
214    pub kind: i32,
215    /// Record the actors that the barrier has passed. Only used for debugging.
216    #[prost(uint32, repeated, tag = "255")]
217    pub passed_actors: ::prost::alloc::vec::Vec<u32>,
218}
219/// Nested message and enum types in `Barrier`.
220pub mod barrier {
221    #[derive(prost_helpers::AnyPB)]
222    #[derive(::enum_as_inner::EnumAsInner)]
223    #[derive(
224        Clone,
225        Copy,
226        Debug,
227        PartialEq,
228        Eq,
229        Hash,
230        PartialOrd,
231        Ord,
232        ::prost::Enumeration
233    )]
234    #[repr(i32)]
235    pub enum BarrierKind {
236        Unspecified = 0,
237        /// The first barrier after a fresh start or recovery.
238        /// There will be no data associated with the previous epoch of the barrier.
239        Initial = 1,
240        /// A normal barrier. Data should be flushed locally.
241        Barrier = 2,
242        /// A checkpoint barrier. Data should be synchorized to the shared storage.
243        Checkpoint = 3,
244    }
245    impl BarrierKind {
246        /// String value of the enum field names used in the ProtoBuf definition.
247        ///
248        /// The values are not transformed in any way and thus are considered stable
249        /// (if the ProtoBuf definition does not change) and safe for programmatic use.
250        pub fn as_str_name(&self) -> &'static str {
251            match self {
252                Self::Unspecified => "BARRIER_KIND_UNSPECIFIED",
253                Self::Initial => "BARRIER_KIND_INITIAL",
254                Self::Barrier => "BARRIER_KIND_BARRIER",
255                Self::Checkpoint => "BARRIER_KIND_CHECKPOINT",
256            }
257        }
258        /// Creates an enum from field names used in the ProtoBuf definition.
259        pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
260            match value {
261                "BARRIER_KIND_UNSPECIFIED" => Some(Self::Unspecified),
262                "BARRIER_KIND_INITIAL" => Some(Self::Initial),
263                "BARRIER_KIND_BARRIER" => Some(Self::Barrier),
264                "BARRIER_KIND_CHECKPOINT" => Some(Self::Checkpoint),
265                _ => None,
266            }
267        }
268    }
269}
270#[derive(prost_helpers::AnyPB)]
271#[derive(Clone, PartialEq, ::prost::Message)]
272pub struct Watermark {
273    /// The reference to the watermark column in the stream's schema.
274    #[prost(message, optional, tag = "1")]
275    pub column: ::core::option::Option<super::expr::InputRef>,
276    /// The watermark value, there will be no record having a greater value in the watermark column.
277    #[prost(message, optional, tag = "3")]
278    pub val: ::core::option::Option<super::data::Datum>,
279}
280#[derive(prost_helpers::AnyPB)]
281#[derive(Clone, PartialEq, ::prost::Message)]
282pub struct StreamMessage {
283    #[prost(oneof = "stream_message::StreamMessage", tags = "1, 2, 3")]
284    pub stream_message: ::core::option::Option<stream_message::StreamMessage>,
285}
286/// Nested message and enum types in `StreamMessage`.
287pub mod stream_message {
288    #[derive(prost_helpers::AnyPB)]
289    #[derive(Clone, PartialEq, ::prost::Oneof)]
290    pub enum StreamMessage {
291        #[prost(message, tag = "1")]
292        StreamChunk(super::super::data::StreamChunk),
293        #[prost(message, tag = "2")]
294        Barrier(super::Barrier),
295        #[prost(message, tag = "3")]
296        Watermark(super::Watermark),
297    }
298}
299#[derive(prost_helpers::AnyPB)]
300#[derive(Clone, PartialEq, ::prost::Message)]
301pub struct StreamMessageBatch {
302    #[prost(oneof = "stream_message_batch::StreamMessageBatch", tags = "1, 2, 3")]
303    pub stream_message_batch: ::core::option::Option<
304        stream_message_batch::StreamMessageBatch,
305    >,
306}
307/// Nested message and enum types in `StreamMessageBatch`.
308pub mod stream_message_batch {
309    #[derive(prost_helpers::AnyPB)]
310    #[derive(Clone, PartialEq, ::prost::Message)]
311    pub struct BarrierBatch {
312        #[prost(message, repeated, tag = "1")]
313        pub barriers: ::prost::alloc::vec::Vec<super::Barrier>,
314    }
315    #[derive(prost_helpers::AnyPB)]
316    #[derive(Clone, PartialEq, ::prost::Oneof)]
317    pub enum StreamMessageBatch {
318        #[prost(message, tag = "1")]
319        StreamChunk(super::super::data::StreamChunk),
320        #[prost(message, tag = "2")]
321        BarrierBatch(BarrierBatch),
322        #[prost(message, tag = "3")]
323        Watermark(super::Watermark),
324    }
325}
326/// Hash mapping for compute node. Stores mapping from virtual node to actor id.
327#[derive(prost_helpers::AnyPB)]
328#[derive(Clone, PartialEq, ::prost::Message)]
329pub struct ActorMapping {
330    #[prost(uint32, repeated, tag = "1")]
331    pub original_indices: ::prost::alloc::vec::Vec<u32>,
332    #[prost(uint32, repeated, tag = "2")]
333    pub data: ::prost::alloc::vec::Vec<u32>,
334}
335#[derive(prost_helpers::AnyPB)]
336#[derive(Clone, PartialEq, ::prost::Message)]
337pub struct StreamSource {
338    #[prost(uint32, tag = "1")]
339    pub source_id: u32,
340    #[prost(message, optional, tag = "2")]
341    pub state_table: ::core::option::Option<super::catalog::Table>,
342    #[prost(uint32, optional, tag = "3")]
343    pub row_id_index: ::core::option::Option<u32>,
344    #[prost(message, repeated, tag = "4")]
345    pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnCatalog>,
346    #[prost(btree_map = "string, string", tag = "6")]
347    pub with_properties: ::prost::alloc::collections::BTreeMap<
348        ::prost::alloc::string::String,
349        ::prost::alloc::string::String,
350    >,
351    #[prost(message, optional, tag = "7")]
352    pub info: ::core::option::Option<super::catalog::StreamSourceInfo>,
353    #[prost(string, tag = "8")]
354    pub source_name: ::prost::alloc::string::String,
355    /// Source rate limit
356    #[prost(uint32, optional, tag = "9")]
357    pub rate_limit: ::core::option::Option<u32>,
358    #[prost(btree_map = "string, message", tag = "10")]
359    pub secret_refs: ::prost::alloc::collections::BTreeMap<
360        ::prost::alloc::string::String,
361        super::secret::SecretRef,
362    >,
363}
364/// copy contents from StreamSource to prevent compatibility issues in the future
365#[derive(prost_helpers::AnyPB)]
366#[derive(Clone, PartialEq, ::prost::Message)]
367pub struct StreamFsFetch {
368    #[prost(uint32, tag = "1")]
369    pub source_id: u32,
370    #[prost(message, optional, tag = "2")]
371    pub state_table: ::core::option::Option<super::catalog::Table>,
372    #[prost(uint32, optional, tag = "3")]
373    pub row_id_index: ::core::option::Option<u32>,
374    #[prost(message, repeated, tag = "4")]
375    pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnCatalog>,
376    #[prost(btree_map = "string, string", tag = "6")]
377    pub with_properties: ::prost::alloc::collections::BTreeMap<
378        ::prost::alloc::string::String,
379        ::prost::alloc::string::String,
380    >,
381    #[prost(message, optional, tag = "7")]
382    pub info: ::core::option::Option<super::catalog::StreamSourceInfo>,
383    #[prost(string, tag = "8")]
384    pub source_name: ::prost::alloc::string::String,
385    /// Source rate limit
386    #[prost(uint32, optional, tag = "9")]
387    pub rate_limit: ::core::option::Option<u32>,
388    #[prost(btree_map = "string, message", tag = "10")]
389    pub secret_refs: ::prost::alloc::collections::BTreeMap<
390        ::prost::alloc::string::String,
391        super::secret::SecretRef,
392    >,
393}
394/// The executor only for receiving barrier from the meta service. It always resides in the leaves
395/// of the streaming graph.
396#[derive(prost_helpers::AnyPB)]
397#[derive(Clone, Copy, PartialEq, ::prost::Message)]
398pub struct BarrierRecvNode {}
399#[derive(prost_helpers::AnyPB)]
400#[derive(Clone, PartialEq, ::prost::Message)]
401pub struct SourceNode {
402    /// The source node can contain either a stream source or nothing. So here we extract all
403    /// information about stream source to a message, and here it will be an `Option` in Rust.
404    #[prost(message, optional, tag = "1")]
405    pub source_inner: ::core::option::Option<StreamSource>,
406}
407#[derive(prost_helpers::AnyPB)]
408#[derive(Clone, PartialEq, ::prost::Message)]
409pub struct StreamFsFetchNode {
410    #[prost(message, optional, tag = "1")]
411    pub node_inner: ::core::option::Option<StreamFsFetch>,
412}
413/// / It's input must be a `MergeNode`, which connects to the upstream source job.
414/// / See `StreamSourceScan::adhoc_to_stream_prost` for the plan.
415#[derive(prost_helpers::AnyPB)]
416#[derive(Clone, PartialEq, ::prost::Message)]
417pub struct SourceBackfillNode {
418    #[prost(uint32, tag = "1")]
419    pub upstream_source_id: u32,
420    #[prost(uint32, optional, tag = "2")]
421    pub row_id_index: ::core::option::Option<u32>,
422    #[prost(message, repeated, tag = "3")]
423    pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnCatalog>,
424    #[prost(message, optional, tag = "4")]
425    pub info: ::core::option::Option<super::catalog::StreamSourceInfo>,
426    #[prost(string, tag = "5")]
427    pub source_name: ::prost::alloc::string::String,
428    #[prost(btree_map = "string, string", tag = "6")]
429    pub with_properties: ::prost::alloc::collections::BTreeMap<
430        ::prost::alloc::string::String,
431        ::prost::alloc::string::String,
432    >,
433    /// Backfill rate limit
434    #[prost(uint32, optional, tag = "7")]
435    pub rate_limit: ::core::option::Option<u32>,
436    /// `| partition_id | backfill_progress |`
437    #[prost(message, optional, tag = "8")]
438    pub state_table: ::core::option::Option<super::catalog::Table>,
439    #[prost(btree_map = "string, message", tag = "9")]
440    pub secret_refs: ::prost::alloc::collections::BTreeMap<
441        ::prost::alloc::string::String,
442        super::secret::SecretRef,
443    >,
444}
445#[derive(prost_helpers::AnyPB)]
446#[derive(Clone, PartialEq, ::prost::Message)]
447pub struct SinkDesc {
448    #[prost(uint32, tag = "1")]
449    pub id: u32,
450    #[prost(string, tag = "2")]
451    pub name: ::prost::alloc::string::String,
452    #[prost(string, tag = "3")]
453    pub definition: ::prost::alloc::string::String,
454    #[prost(message, repeated, tag = "5")]
455    pub plan_pk: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
456    #[prost(uint32, repeated, tag = "6")]
457    pub downstream_pk: ::prost::alloc::vec::Vec<u32>,
458    #[prost(uint32, repeated, tag = "7")]
459    pub distribution_key: ::prost::alloc::vec::Vec<u32>,
460    #[prost(btree_map = "string, string", tag = "8")]
461    pub properties: ::prost::alloc::collections::BTreeMap<
462        ::prost::alloc::string::String,
463        ::prost::alloc::string::String,
464    >,
465    /// to be deprecated
466    #[prost(enumeration = "super::catalog::SinkType", tag = "9")]
467    pub sink_type: i32,
468    #[prost(message, repeated, tag = "10")]
469    pub column_catalogs: ::prost::alloc::vec::Vec<super::plan_common::ColumnCatalog>,
470    #[prost(string, tag = "11")]
471    pub db_name: ::prost::alloc::string::String,
472    /// If the sink is from table or mv, this is name of the table/mv. Otherwise
473    /// it is the name of the sink itself.
474    #[prost(string, tag = "12")]
475    pub sink_from_name: ::prost::alloc::string::String,
476    #[prost(message, optional, tag = "13")]
477    pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
478    #[prost(uint32, optional, tag = "14")]
479    pub target_table: ::core::option::Option<u32>,
480    #[prost(uint64, optional, tag = "15")]
481    pub extra_partition_col_idx: ::core::option::Option<u64>,
482    #[prost(btree_map = "string, message", tag = "16")]
483    pub secret_refs: ::prost::alloc::collections::BTreeMap<
484        ::prost::alloc::string::String,
485        super::secret::SecretRef,
486    >,
487}
488#[derive(prost_helpers::AnyPB)]
489#[derive(Clone, PartialEq, ::prost::Message)]
490pub struct SinkNode {
491    #[prost(message, optional, tag = "1")]
492    pub sink_desc: ::core::option::Option<SinkDesc>,
493    /// A sink with a kv log store should have a table.
494    #[prost(message, optional, tag = "2")]
495    pub table: ::core::option::Option<super::catalog::Table>,
496    #[prost(enumeration = "SinkLogStoreType", tag = "3")]
497    pub log_store_type: i32,
498    #[prost(uint32, optional, tag = "4")]
499    pub rate_limit: ::core::option::Option<u32>,
500}
501#[derive(prost_helpers::AnyPB)]
502#[derive(Clone, PartialEq, ::prost::Message)]
503pub struct ProjectNode {
504    #[prost(message, repeated, tag = "1")]
505    pub select_list: ::prost::alloc::vec::Vec<super::expr::ExprNode>,
506    /// this two field is expressing a list of usize pair, which means when project receives a
507    /// watermark with `watermark_input_cols\[i\]` column index, it should derive a new watermark
508    /// with `watermark_output_cols\[i\]`th expression
509    #[prost(uint32, repeated, tag = "2")]
510    pub watermark_input_cols: ::prost::alloc::vec::Vec<u32>,
511    #[prost(uint32, repeated, tag = "3")]
512    pub watermark_output_cols: ::prost::alloc::vec::Vec<u32>,
513    #[prost(uint32, repeated, tag = "4")]
514    pub nondecreasing_exprs: ::prost::alloc::vec::Vec<u32>,
515    /// Whether there are likely no-op updates in the output chunks, so that eliminating them with
516    /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
517    #[prost(bool, tag = "5")]
518    pub noop_update_hint: bool,
519}
520#[derive(prost_helpers::AnyPB)]
521#[derive(Clone, PartialEq, ::prost::Message)]
522pub struct FilterNode {
523    #[prost(message, optional, tag = "1")]
524    pub search_condition: ::core::option::Option<super::expr::ExprNode>,
525}
526#[derive(prost_helpers::AnyPB)]
527#[derive(Clone, Copy, PartialEq, ::prost::Message)]
528pub struct ChangeLogNode {
529    /// Whether or not there is an op in the final output.
530    #[prost(bool, tag = "1")]
531    pub need_op: bool,
532}
533#[derive(prost_helpers::AnyPB)]
534#[derive(Clone, PartialEq, ::prost::Message)]
535pub struct CdcFilterNode {
536    #[prost(message, optional, tag = "1")]
537    pub search_condition: ::core::option::Option<super::expr::ExprNode>,
538    #[prost(uint32, tag = "2")]
539    pub upstream_source_id: u32,
540}
541/// A materialized view is regarded as a table.
542/// In addition, we also specify primary key to MV for efficient point lookup during update and deletion.
543///
544/// The node will be used for both create mv and create index.
545/// - When creating mv, `pk == distribution_key == column_orders`.
546/// - When creating index, `column_orders` will contain both
547///    arrange columns and pk columns, while distribution key will be arrange columns.
548#[derive(prost_helpers::AnyPB)]
549#[derive(Clone, PartialEq, ::prost::Message)]
550pub struct MaterializeNode {
551    #[prost(uint32, tag = "1")]
552    pub table_id: u32,
553    /// Column indexes and orders of primary key.
554    #[prost(message, repeated, tag = "2")]
555    pub column_orders: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
556    /// Used for internal table states.
557    #[prost(message, optional, tag = "3")]
558    pub table: ::core::option::Option<super::catalog::Table>,
559}
560#[derive(prost_helpers::AnyPB)]
561#[derive(Clone, PartialEq, ::prost::Message)]
562pub struct AggCallState {
563    #[prost(oneof = "agg_call_state::Inner", tags = "1, 3")]
564    pub inner: ::core::option::Option<agg_call_state::Inner>,
565}
566/// Nested message and enum types in `AggCallState`.
567pub mod agg_call_state {
568    /// the state is stored in the intermediate state table. used for count/sum/append-only extreme.
569    #[derive(prost_helpers::AnyPB)]
570    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
571    pub struct ValueState {}
572    /// use the some column of the Upstream's materialization as the AggCall's state, used for extreme/string_agg/array_agg.
573    #[derive(prost_helpers::AnyPB)]
574    #[derive(Clone, PartialEq, ::prost::Message)]
575    pub struct MaterializedInputState {
576        #[prost(message, optional, tag = "1")]
577        pub table: ::core::option::Option<super::super::catalog::Table>,
578        /// for constructing state table column mapping
579        #[prost(uint32, repeated, tag = "2")]
580        pub included_upstream_indices: ::prost::alloc::vec::Vec<u32>,
581        #[prost(uint32, repeated, tag = "3")]
582        pub table_value_indices: ::prost::alloc::vec::Vec<u32>,
583        #[prost(message, repeated, tag = "4")]
584        pub order_columns: ::prost::alloc::vec::Vec<super::super::common::ColumnOrder>,
585    }
586    #[derive(prost_helpers::AnyPB)]
587    #[derive(Clone, PartialEq, ::prost::Oneof)]
588    pub enum Inner {
589        #[prost(message, tag = "1")]
590        ValueState(ValueState),
591        #[prost(message, tag = "3")]
592        MaterializedInputState(MaterializedInputState),
593    }
594}
595#[derive(prost_helpers::AnyPB)]
596#[derive(Clone, PartialEq, ::prost::Message)]
597pub struct SimpleAggNode {
598    #[prost(message, repeated, tag = "1")]
599    pub agg_calls: ::prost::alloc::vec::Vec<super::expr::AggCall>,
600    /// Only used for stateless simple agg.
601    #[prost(uint32, repeated, tag = "2")]
602    pub distribution_key: ::prost::alloc::vec::Vec<u32>,
603    #[prost(message, repeated, tag = "3")]
604    pub agg_call_states: ::prost::alloc::vec::Vec<AggCallState>,
605    #[prost(message, optional, tag = "4")]
606    pub intermediate_state_table: ::core::option::Option<super::catalog::Table>,
607    /// Whether to optimize for append only stream.
608    /// It is true when the input is append-only
609    #[prost(bool, tag = "5")]
610    pub is_append_only: bool,
611    #[prost(map = "uint32, message", tag = "6")]
612    pub distinct_dedup_tables: ::std::collections::HashMap<u32, super::catalog::Table>,
613    #[prost(uint32, tag = "7")]
614    pub row_count_index: u32,
615    #[prost(enumeration = "AggNodeVersion", tag = "8")]
616    pub version: i32,
617    /// Required by the downstream `RowMergeNode`,
618    /// currently only used by the `approx_percentile`'s two phase plan
619    #[prost(bool, tag = "9")]
620    pub must_output_per_barrier: bool,
621}
622#[derive(prost_helpers::AnyPB)]
623#[derive(Clone, PartialEq, ::prost::Message)]
624pub struct HashAggNode {
625    #[prost(uint32, repeated, tag = "1")]
626    pub group_key: ::prost::alloc::vec::Vec<u32>,
627    #[prost(message, repeated, tag = "2")]
628    pub agg_calls: ::prost::alloc::vec::Vec<super::expr::AggCall>,
629    #[prost(message, repeated, tag = "3")]
630    pub agg_call_states: ::prost::alloc::vec::Vec<AggCallState>,
631    #[prost(message, optional, tag = "4")]
632    pub intermediate_state_table: ::core::option::Option<super::catalog::Table>,
633    /// Whether to optimize for append only stream.
634    /// It is true when the input is append-only
635    #[prost(bool, tag = "5")]
636    pub is_append_only: bool,
637    #[prost(map = "uint32, message", tag = "6")]
638    pub distinct_dedup_tables: ::std::collections::HashMap<u32, super::catalog::Table>,
639    #[prost(uint32, tag = "7")]
640    pub row_count_index: u32,
641    #[prost(bool, tag = "8")]
642    pub emit_on_window_close: bool,
643    #[prost(enumeration = "AggNodeVersion", tag = "9")]
644    pub version: i32,
645}
646#[derive(prost_helpers::AnyPB)]
647#[derive(Clone, PartialEq, ::prost::Message)]
648pub struct TopNNode {
649    /// 0 means no limit as limit of 0 means this node should be optimized away
650    #[prost(uint64, tag = "1")]
651    pub limit: u64,
652    #[prost(uint64, tag = "2")]
653    pub offset: u64,
654    #[prost(message, optional, tag = "3")]
655    pub table: ::core::option::Option<super::catalog::Table>,
656    #[prost(message, repeated, tag = "4")]
657    pub order_by: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
658    #[prost(bool, tag = "5")]
659    pub with_ties: bool,
660}
661#[derive(prost_helpers::AnyPB)]
662#[derive(Clone, PartialEq, ::prost::Message)]
663pub struct GroupTopNNode {
664    /// 0 means no limit as limit of 0 means this node should be optimized away
665    #[prost(uint64, tag = "1")]
666    pub limit: u64,
667    #[prost(uint64, tag = "2")]
668    pub offset: u64,
669    #[prost(uint32, repeated, tag = "3")]
670    pub group_key: ::prost::alloc::vec::Vec<u32>,
671    #[prost(message, optional, tag = "4")]
672    pub table: ::core::option::Option<super::catalog::Table>,
673    #[prost(message, repeated, tag = "5")]
674    pub order_by: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
675    #[prost(bool, tag = "6")]
676    pub with_ties: bool,
677}
678#[derive(prost_helpers::AnyPB)]
679#[derive(Clone, PartialEq, ::prost::Message)]
680pub struct DeltaExpression {
681    #[prost(enumeration = "super::expr::expr_node::Type", tag = "1")]
682    pub delta_type: i32,
683    #[prost(message, optional, tag = "2")]
684    pub delta: ::core::option::Option<super::expr::ExprNode>,
685}
686#[derive(prost_helpers::AnyPB)]
687#[derive(Clone, PartialEq, ::prost::Message)]
688pub struct InequalityPair {
689    /// Input index of greater side of inequality.
690    #[prost(uint32, tag = "1")]
691    pub key_required_larger: u32,
692    /// Input index of less side of inequality.
693    #[prost(uint32, tag = "2")]
694    pub key_required_smaller: u32,
695    /// Whether this condition is used to clean state table of `HashJoinExecutor`.
696    #[prost(bool, tag = "3")]
697    pub clean_state: bool,
698    /// greater >= less + delta_expression, if `None`, it represents that greater >= less
699    #[prost(message, optional, tag = "4")]
700    pub delta_expression: ::core::option::Option<DeltaExpression>,
701}
702#[derive(prost_helpers::AnyPB)]
703#[derive(Clone, PartialEq, ::prost::Message)]
704pub struct HashJoinNode {
705    #[prost(enumeration = "super::plan_common::JoinType", tag = "1")]
706    pub join_type: i32,
707    #[prost(int32, repeated, tag = "2")]
708    pub left_key: ::prost::alloc::vec::Vec<i32>,
709    #[prost(int32, repeated, tag = "3")]
710    pub right_key: ::prost::alloc::vec::Vec<i32>,
711    #[prost(message, optional, tag = "4")]
712    pub condition: ::core::option::Option<super::expr::ExprNode>,
713    #[prost(message, repeated, tag = "5")]
714    pub inequality_pairs: ::prost::alloc::vec::Vec<InequalityPair>,
715    /// Used for internal table states.
716    #[prost(message, optional, tag = "6")]
717    pub left_table: ::core::option::Option<super::catalog::Table>,
718    /// Used for internal table states.
719    #[prost(message, optional, tag = "7")]
720    pub right_table: ::core::option::Option<super::catalog::Table>,
721    /// Used for internal table states.
722    #[prost(message, optional, tag = "8")]
723    pub left_degree_table: ::core::option::Option<super::catalog::Table>,
724    /// Used for internal table states.
725    #[prost(message, optional, tag = "9")]
726    pub right_degree_table: ::core::option::Option<super::catalog::Table>,
727    /// The output indices of current node
728    #[prost(uint32, repeated, tag = "10")]
729    pub output_indices: ::prost::alloc::vec::Vec<u32>,
730    /// Left deduped input pk indices. The pk of the left_table and
731    /// left_degree_table is  \[left_join_key | left_deduped_input_pk_indices\]
732    /// and is expected to be the shortest key which starts with
733    /// the join key and satisfies unique constrain.
734    #[prost(uint32, repeated, tag = "11")]
735    pub left_deduped_input_pk_indices: ::prost::alloc::vec::Vec<u32>,
736    /// Right deduped input pk indices. The pk of the right_table and
737    /// right_degree_table is  \[right_join_key | right_deduped_input_pk_indices\]
738    /// and is expected to be the shortest key which starts with
739    /// the join key and satisfies unique constrain.
740    #[prost(uint32, repeated, tag = "12")]
741    pub right_deduped_input_pk_indices: ::prost::alloc::vec::Vec<u32>,
742    #[prost(bool, repeated, tag = "13")]
743    pub null_safe: ::prost::alloc::vec::Vec<bool>,
744    /// Whether to optimize for append only stream.
745    /// It is true when the input is append-only
746    #[prost(bool, tag = "14")]
747    pub is_append_only: bool,
748}
749#[derive(prost_helpers::AnyPB)]
750#[derive(Clone, PartialEq, ::prost::Message)]
751pub struct AsOfJoinNode {
752    #[prost(enumeration = "super::plan_common::AsOfJoinType", tag = "1")]
753    pub join_type: i32,
754    #[prost(int32, repeated, tag = "2")]
755    pub left_key: ::prost::alloc::vec::Vec<i32>,
756    #[prost(int32, repeated, tag = "3")]
757    pub right_key: ::prost::alloc::vec::Vec<i32>,
758    /// Used for internal table states.
759    #[prost(message, optional, tag = "4")]
760    pub left_table: ::core::option::Option<super::catalog::Table>,
761    /// Used for internal table states.
762    #[prost(message, optional, tag = "5")]
763    pub right_table: ::core::option::Option<super::catalog::Table>,
764    /// The output indices of current node
765    #[prost(uint32, repeated, tag = "6")]
766    pub output_indices: ::prost::alloc::vec::Vec<u32>,
767    /// Left deduped input pk indices. The pk of the left_table and
768    /// The pk of the left_table is  \[left_join_key | left_inequality_key | left_deduped_input_pk_indices\]
769    /// left_inequality_key is not used but for forward compatibility.
770    #[prost(uint32, repeated, tag = "7")]
771    pub left_deduped_input_pk_indices: ::prost::alloc::vec::Vec<u32>,
772    /// Right deduped input pk indices.
773    /// The pk of the right_table is  \[right_join_key | right_inequality_key | right_deduped_input_pk_indices\]
774    /// right_inequality_key is not used but for forward compatibility.
775    #[prost(uint32, repeated, tag = "8")]
776    pub right_deduped_input_pk_indices: ::prost::alloc::vec::Vec<u32>,
777    #[prost(bool, repeated, tag = "9")]
778    pub null_safe: ::prost::alloc::vec::Vec<bool>,
779    #[prost(message, optional, tag = "10")]
780    pub asof_desc: ::core::option::Option<super::plan_common::AsOfJoinDesc>,
781}
782#[derive(prost_helpers::AnyPB)]
783#[derive(Clone, PartialEq, ::prost::Message)]
784pub struct TemporalJoinNode {
785    #[prost(enumeration = "super::plan_common::JoinType", tag = "1")]
786    pub join_type: i32,
787    #[prost(int32, repeated, tag = "2")]
788    pub left_key: ::prost::alloc::vec::Vec<i32>,
789    #[prost(int32, repeated, tag = "3")]
790    pub right_key: ::prost::alloc::vec::Vec<i32>,
791    #[prost(bool, repeated, tag = "4")]
792    pub null_safe: ::prost::alloc::vec::Vec<bool>,
793    #[prost(message, optional, tag = "5")]
794    pub condition: ::core::option::Option<super::expr::ExprNode>,
795    /// The output indices of current node
796    #[prost(uint32, repeated, tag = "6")]
797    pub output_indices: ::prost::alloc::vec::Vec<u32>,
798    /// The table desc of the lookup side table.
799    #[prost(message, optional, tag = "7")]
800    pub table_desc: ::core::option::Option<super::plan_common::StorageTableDesc>,
801    /// The output indices of the lookup side table
802    #[prost(uint32, repeated, tag = "8")]
803    pub table_output_indices: ::prost::alloc::vec::Vec<u32>,
804    /// The state table used for non-append-only temporal join.
805    #[prost(message, optional, tag = "9")]
806    pub memo_table: ::core::option::Option<super::catalog::Table>,
807    /// If it is a nested lool temporal join
808    #[prost(bool, tag = "10")]
809    pub is_nested_loop: bool,
810}
811#[derive(prost_helpers::AnyPB)]
812#[derive(Clone, PartialEq, ::prost::Message)]
813pub struct DynamicFilterNode {
814    #[prost(uint32, tag = "1")]
815    pub left_key: u32,
816    /// Must be one of <, <=, >, >=
817    #[prost(message, optional, tag = "2")]
818    pub condition: ::core::option::Option<super::expr::ExprNode>,
819    /// Left table stores all states with predicate possibly not NULL.
820    #[prost(message, optional, tag = "3")]
821    pub left_table: ::core::option::Option<super::catalog::Table>,
822    /// Right table stores single value from RHS of predicate.
823    #[prost(message, optional, tag = "4")]
824    pub right_table: ::core::option::Option<super::catalog::Table>,
825    /// If the right side's change always make the condition more relaxed.
826    /// In other words, make more record in the left side satisfy the condition.
827    /// If this is true, we need to store LHS records which do not match the condition in the internal table.
828    /// When the condition changes, we will tell downstream to insert the LHS records which now match the condition.
829    /// If this is false, we need to store RHS records which match the condition in the internal table.
830    /// When the condition changes, we will tell downstream to delete the LHS records which now no longer match the condition.
831    #[deprecated]
832    #[prost(bool, tag = "5")]
833    pub condition_always_relax: bool,
834}
835/// Delta join with two indexes. This is a pseudo plan node generated on frontend. On meta
836/// service, it will be rewritten into lookup joins.
837#[derive(prost_helpers::AnyPB)]
838#[derive(Clone, PartialEq, ::prost::Message)]
839pub struct DeltaIndexJoinNode {
840    #[prost(enumeration = "super::plan_common::JoinType", tag = "1")]
841    pub join_type: i32,
842    #[prost(int32, repeated, tag = "2")]
843    pub left_key: ::prost::alloc::vec::Vec<i32>,
844    #[prost(int32, repeated, tag = "3")]
845    pub right_key: ::prost::alloc::vec::Vec<i32>,
846    #[prost(message, optional, tag = "4")]
847    pub condition: ::core::option::Option<super::expr::ExprNode>,
848    /// Table id of the left index.
849    #[prost(uint32, tag = "7")]
850    pub left_table_id: u32,
851    /// Table id of the right index.
852    #[prost(uint32, tag = "8")]
853    pub right_table_id: u32,
854    /// Info about the left index
855    #[prost(message, optional, tag = "9")]
856    pub left_info: ::core::option::Option<ArrangementInfo>,
857    /// Info about the right index
858    #[prost(message, optional, tag = "10")]
859    pub right_info: ::core::option::Option<ArrangementInfo>,
860    /// the output indices of current node
861    #[prost(uint32, repeated, tag = "11")]
862    pub output_indices: ::prost::alloc::vec::Vec<u32>,
863}
864#[derive(prost_helpers::AnyPB)]
865#[derive(Clone, PartialEq, ::prost::Message)]
866pub struct HopWindowNode {
867    #[prost(uint32, tag = "1")]
868    pub time_col: u32,
869    #[prost(message, optional, tag = "2")]
870    pub window_slide: ::core::option::Option<super::data::Interval>,
871    #[prost(message, optional, tag = "3")]
872    pub window_size: ::core::option::Option<super::data::Interval>,
873    #[prost(uint32, repeated, tag = "4")]
874    pub output_indices: ::prost::alloc::vec::Vec<u32>,
875    #[prost(message, repeated, tag = "5")]
876    pub window_start_exprs: ::prost::alloc::vec::Vec<super::expr::ExprNode>,
877    #[prost(message, repeated, tag = "6")]
878    pub window_end_exprs: ::prost::alloc::vec::Vec<super::expr::ExprNode>,
879}
880#[derive(prost_helpers::AnyPB)]
881#[derive(Clone, PartialEq, ::prost::Message)]
882pub struct MergeNode {
883    /// **WARNING**: Use this field with caution.
884    ///
885    /// `upstream_actor_id` stored in the plan node in `Fragment` meta model cannot be directly used.
886    /// See `compose_fragment`.
887    /// The field is deprecated because the upstream actor info is provided separately instead of
888    /// injected here in the node.
889    #[deprecated]
890    #[prost(uint32, repeated, packed = "false", tag = "1")]
891    pub upstream_actor_id: ::prost::alloc::vec::Vec<u32>,
892    #[prost(uint32, tag = "2")]
893    pub upstream_fragment_id: u32,
894    /// Type of the upstream dispatcher. If there's always one upstream according to this
895    /// type, the compute node may use the `ReceiverExecutor` as an optimization.
896    #[prost(enumeration = "DispatcherType", tag = "3")]
897    pub upstream_dispatcher_type: i32,
898    /// The schema of input columns. TODO: remove this field.
899    #[prost(message, repeated, tag = "4")]
900    pub fields: ::prost::alloc::vec::Vec<super::plan_common::Field>,
901}
902/// passed from frontend to meta, used by fragmenter to generate `MergeNode`
903/// and maybe `DispatcherNode` later.
904#[derive(prost_helpers::AnyPB)]
905#[derive(Clone, PartialEq, ::prost::Message)]
906pub struct ExchangeNode {
907    #[prost(message, optional, tag = "1")]
908    pub strategy: ::core::option::Option<DispatchStrategy>,
909}
910/// StreamScanNode reads data from upstream table first, and then pass all events to downstream.
911/// It always these 2 inputs in the following order:
912/// 1. A MergeNode (as a placeholder) of upstream.
913/// 2. A BatchPlanNode for the snapshot read.
914#[derive(prost_helpers::AnyPB)]
915#[derive(Clone, PartialEq, ::prost::Message)]
916pub struct StreamScanNode {
917    #[prost(uint32, tag = "1")]
918    pub table_id: u32,
919    /// The columns from the upstream table that'll be internally required by this stream scan node.
920    /// - For non-backfill stream scan node, it's the same as the output columns.
921    /// - For backfill stream scan node, there're additionally primary key columns.
922    #[prost(int32, repeated, tag = "2")]
923    pub upstream_column_ids: ::prost::alloc::vec::Vec<i32>,
924    /// The columns to be output by this stream scan node. The index is based on the internal required columns.
925    /// - For non-backfill stream scan node, it's simply all the columns.
926    /// - For backfill stream scan node, this strips the primary key columns if they're unnecessary.
927    #[prost(uint32, repeated, tag = "3")]
928    pub output_indices: ::prost::alloc::vec::Vec<u32>,
929    /// Generally, the barrier needs to be rearranged during the MV creation process, so that data can
930    /// be flushed to shared buffer periodically, instead of making the first epoch from batch query extra
931    /// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in StreamScanNode.
932    /// StreamScanType is used to decide which implementation for the StreamScanNode.
933    #[prost(enumeration = "StreamScanType", tag = "4")]
934    pub stream_scan_type: i32,
935    /// / The state table used by Backfill operator for persisting internal state
936    #[prost(message, optional, tag = "5")]
937    pub state_table: ::core::option::Option<super::catalog::Table>,
938    /// The upstream materialized view info used by backfill.
939    /// Used iff `ChainType::Backfill`.
940    #[prost(message, optional, tag = "7")]
941    pub table_desc: ::core::option::Option<super::plan_common::StorageTableDesc>,
942    /// The backfill rate limit for the stream scan node.
943    #[prost(uint32, optional, tag = "8")]
944    pub rate_limit: ::core::option::Option<u32>,
945    /// Snapshot read every N barriers
946    #[deprecated]
947    #[prost(uint32, tag = "9")]
948    pub snapshot_read_barrier_interval: u32,
949    /// The state table used by ArrangementBackfill to replicate upstream mview's state table.
950    /// Used iff `ChainType::ArrangementBackfill`.
951    #[prost(message, optional, tag = "10")]
952    pub arrangement_table: ::core::option::Option<super::catalog::Table>,
953    #[prost(uint64, optional, tag = "11")]
954    pub snapshot_backfill_epoch: ::core::option::Option<u64>,
955}
956/// Config options for CDC backfill
957#[derive(prost_helpers::AnyPB)]
958#[derive(Clone, Copy, PartialEq, ::prost::Message)]
959pub struct StreamCdcScanOptions {
960    /// Whether skip the backfill and only consume from upstream.
961    #[prost(bool, tag = "1")]
962    pub disable_backfill: bool,
963    #[prost(uint32, tag = "2")]
964    pub snapshot_barrier_interval: u32,
965    #[prost(uint32, tag = "3")]
966    pub snapshot_batch_size: u32,
967}
968#[derive(prost_helpers::AnyPB)]
969#[derive(Clone, PartialEq, ::prost::Message)]
970pub struct StreamCdcScanNode {
971    #[prost(uint32, tag = "1")]
972    pub table_id: u32,
973    /// The columns from the upstream table that'll be internally required by this stream scan node.
974    /// Contains Primary Keys and Output columns.
975    #[prost(int32, repeated, tag = "2")]
976    pub upstream_column_ids: ::prost::alloc::vec::Vec<i32>,
977    /// Strips the primary key columns if they're unnecessary.
978    #[prost(uint32, repeated, tag = "3")]
979    pub output_indices: ::prost::alloc::vec::Vec<u32>,
980    /// The state table used by CdcBackfill operator for persisting internal state
981    #[prost(message, optional, tag = "4")]
982    pub state_table: ::core::option::Option<super::catalog::Table>,
983    /// The external table that will be backfilled for CDC.
984    #[prost(message, optional, tag = "5")]
985    pub cdc_table_desc: ::core::option::Option<super::plan_common::ExternalTableDesc>,
986    /// The backfill rate limit for the stream cdc scan node.
987    #[prost(uint32, optional, tag = "6")]
988    pub rate_limit: ::core::option::Option<u32>,
989    /// Whether skip the backfill and only consume from upstream.
990    /// keep it for backward compatibility, new stream plan will use `options.disable_backfill`
991    #[prost(bool, tag = "7")]
992    pub disable_backfill: bool,
993    #[prost(message, optional, tag = "8")]
994    pub options: ::core::option::Option<StreamCdcScanOptions>,
995}
996/// BatchPlanNode is used for mv on mv snapshot read.
997/// BatchPlanNode is supposed to carry a batch plan that can be optimized with the streaming plan_common.
998/// Currently, streaming to batch push down is not yet supported, BatchPlanNode is simply a table scan.
999#[derive(prost_helpers::AnyPB)]
1000#[derive(Clone, PartialEq, ::prost::Message)]
1001pub struct BatchPlanNode {
1002    #[prost(message, optional, tag = "1")]
1003    pub table_desc: ::core::option::Option<super::plan_common::StorageTableDesc>,
1004    #[prost(int32, repeated, tag = "2")]
1005    pub column_ids: ::prost::alloc::vec::Vec<i32>,
1006}
1007#[derive(prost_helpers::AnyPB)]
1008#[derive(Clone, PartialEq, ::prost::Message)]
1009pub struct ArrangementInfo {
1010    /// Order key of the arrangement, including order by columns and pk from the materialize
1011    /// executor.
1012    #[prost(message, repeated, tag = "1")]
1013    pub arrange_key_orders: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
1014    /// Column descs of the arrangement
1015    #[prost(message, repeated, tag = "2")]
1016    pub column_descs: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
1017    /// Used to build storage table by stream lookup join of delta join.
1018    #[prost(message, optional, tag = "4")]
1019    pub table_desc: ::core::option::Option<super::plan_common::StorageTableDesc>,
1020    /// Output index columns
1021    #[prost(uint32, repeated, tag = "5")]
1022    pub output_col_idx: ::prost::alloc::vec::Vec<u32>,
1023}
1024/// Special node for shared state, which will only be produced in fragmenter. ArrangeNode will
1025/// produce a special Materialize executor, which materializes data for downstream to query.
1026#[derive(prost_helpers::AnyPB)]
1027#[derive(Clone, PartialEq, ::prost::Message)]
1028pub struct ArrangeNode {
1029    /// Info about the arrangement
1030    #[prost(message, optional, tag = "1")]
1031    pub table_info: ::core::option::Option<ArrangementInfo>,
1032    /// Hash key of the materialize node, which is a subset of pk.
1033    #[prost(uint32, repeated, tag = "2")]
1034    pub distribution_key: ::prost::alloc::vec::Vec<u32>,
1035    /// Used for internal table states.
1036    #[prost(message, optional, tag = "3")]
1037    pub table: ::core::option::Option<super::catalog::Table>,
1038}
1039/// Special node for shared state. LookupNode will join an arrangement with a stream.
1040#[derive(prost_helpers::AnyPB)]
1041#[derive(Clone, PartialEq, ::prost::Message)]
1042pub struct LookupNode {
1043    /// Join key of the arrangement side
1044    #[prost(int32, repeated, tag = "1")]
1045    pub arrange_key: ::prost::alloc::vec::Vec<i32>,
1046    /// Join key of the stream side
1047    #[prost(int32, repeated, tag = "2")]
1048    pub stream_key: ::prost::alloc::vec::Vec<i32>,
1049    /// Whether to join the current epoch of arrangement
1050    #[prost(bool, tag = "3")]
1051    pub use_current_epoch: bool,
1052    /// Sometimes we need to re-order the output data to meet the requirement of schema.
1053    /// By default, lookup executor will produce `<arrangement side, stream side>`. We
1054    /// will then apply the column mapping to the combined result.
1055    #[prost(int32, repeated, tag = "4")]
1056    pub column_mapping: ::prost::alloc::vec::Vec<i32>,
1057    /// Info about the arrangement
1058    #[prost(message, optional, tag = "7")]
1059    pub arrangement_table_info: ::core::option::Option<ArrangementInfo>,
1060    #[prost(oneof = "lookup_node::ArrangementTableId", tags = "5, 6")]
1061    pub arrangement_table_id: ::core::option::Option<lookup_node::ArrangementTableId>,
1062}
1063/// Nested message and enum types in `LookupNode`.
1064pub mod lookup_node {
1065    #[derive(prost_helpers::AnyPB)]
1066    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
1067    pub enum ArrangementTableId {
1068        /// Table Id of the arrangement (when created along with join plan)
1069        #[prost(uint32, tag = "5")]
1070        TableId(u32),
1071        /// Table Id of the arrangement (when using index)
1072        #[prost(uint32, tag = "6")]
1073        IndexId(u32),
1074    }
1075}
1076/// WatermarkFilter needs to filter the upstream data by the water mark.
1077#[derive(prost_helpers::AnyPB)]
1078#[derive(Clone, PartialEq, ::prost::Message)]
1079pub struct WatermarkFilterNode {
1080    /// The watermark descs
1081    #[prost(message, repeated, tag = "1")]
1082    pub watermark_descs: ::prost::alloc::vec::Vec<super::catalog::WatermarkDesc>,
1083    /// The tables used to persist watermarks, the key is vnode.
1084    #[prost(message, repeated, tag = "2")]
1085    pub tables: ::prost::alloc::vec::Vec<super::catalog::Table>,
1086}
1087/// Acts like a merger, but on different inputs.
1088#[derive(prost_helpers::AnyPB)]
1089#[derive(Clone, Copy, PartialEq, ::prost::Message)]
1090pub struct UnionNode {}
1091/// Special node for shared state. Merge and align barrier from upstreams. Pipe inputs in order.
1092#[derive(prost_helpers::AnyPB)]
1093#[derive(Clone, PartialEq, ::prost::Message)]
1094pub struct LookupUnionNode {
1095    #[prost(uint32, repeated, tag = "1")]
1096    pub order: ::prost::alloc::vec::Vec<u32>,
1097}
1098#[derive(prost_helpers::AnyPB)]
1099#[derive(Clone, PartialEq, ::prost::Message)]
1100pub struct ExpandNode {
1101    #[prost(message, repeated, tag = "1")]
1102    pub column_subsets: ::prost::alloc::vec::Vec<expand_node::Subset>,
1103}
1104/// Nested message and enum types in `ExpandNode`.
1105pub mod expand_node {
1106    #[derive(prost_helpers::AnyPB)]
1107    #[derive(Clone, PartialEq, ::prost::Message)]
1108    pub struct Subset {
1109        #[prost(uint32, repeated, tag = "1")]
1110        pub column_indices: ::prost::alloc::vec::Vec<u32>,
1111    }
1112}
1113#[derive(prost_helpers::AnyPB)]
1114#[derive(Clone, PartialEq, ::prost::Message)]
1115pub struct ProjectSetNode {
1116    #[prost(message, repeated, tag = "1")]
1117    pub select_list: ::prost::alloc::vec::Vec<super::expr::ProjectSetSelectItem>,
1118    /// this two field is expressing a list of usize pair, which means when project receives a
1119    /// watermark with `watermark_input_cols\[i\]` column index, it should derive a new watermark
1120    /// with `watermark_output_cols\[i\]`th expression
1121    #[prost(uint32, repeated, tag = "2")]
1122    pub watermark_input_cols: ::prost::alloc::vec::Vec<u32>,
1123    #[prost(uint32, repeated, tag = "3")]
1124    pub watermark_expr_indices: ::prost::alloc::vec::Vec<u32>,
1125    #[prost(uint32, repeated, tag = "4")]
1126    pub nondecreasing_exprs: ::prost::alloc::vec::Vec<u32>,
1127}
1128/// Sorts inputs and outputs ordered data based on watermark.
1129#[derive(prost_helpers::AnyPB)]
1130#[derive(Clone, PartialEq, ::prost::Message)]
1131pub struct SortNode {
1132    /// Persists data above watermark.
1133    #[prost(message, optional, tag = "1")]
1134    pub state_table: ::core::option::Option<super::catalog::Table>,
1135    /// Column index of watermark to perform sorting.
1136    #[prost(uint32, tag = "2")]
1137    pub sort_column_index: u32,
1138}
1139/// Merges two streams from streaming and batch for data manipulation.
1140#[derive(prost_helpers::AnyPB)]
1141#[derive(Clone, PartialEq, ::prost::Message)]
1142pub struct DmlNode {
1143    /// Id of the table on which DML performs.
1144    #[prost(uint32, tag = "1")]
1145    pub table_id: u32,
1146    /// Version of the table.
1147    #[prost(uint64, tag = "3")]
1148    pub table_version_id: u64,
1149    /// Column descriptions of the table.
1150    #[prost(message, repeated, tag = "2")]
1151    pub column_descs: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
1152    #[prost(uint32, optional, tag = "4")]
1153    pub rate_limit: ::core::option::Option<u32>,
1154}
1155#[derive(prost_helpers::AnyPB)]
1156#[derive(Clone, Copy, PartialEq, ::prost::Message)]
1157pub struct RowIdGenNode {
1158    #[prost(uint64, tag = "1")]
1159    pub row_id_index: u64,
1160}
1161#[derive(prost_helpers::AnyPB)]
1162#[derive(Clone, Copy, PartialEq, ::prost::Message)]
1163pub struct NowModeUpdateCurrent {}
1164#[derive(prost_helpers::AnyPB)]
1165#[derive(Clone, PartialEq, ::prost::Message)]
1166pub struct NowModeGenerateSeries {
1167    #[prost(message, optional, tag = "1")]
1168    pub start_timestamp: ::core::option::Option<super::data::Datum>,
1169    #[prost(message, optional, tag = "2")]
1170    pub interval: ::core::option::Option<super::data::Datum>,
1171}
1172#[derive(prost_helpers::AnyPB)]
1173#[derive(Clone, PartialEq, ::prost::Message)]
1174pub struct NowNode {
1175    /// Persists emitted 'now'.
1176    #[prost(message, optional, tag = "1")]
1177    pub state_table: ::core::option::Option<super::catalog::Table>,
1178    #[prost(oneof = "now_node::Mode", tags = "101, 102")]
1179    pub mode: ::core::option::Option<now_node::Mode>,
1180}
1181/// Nested message and enum types in `NowNode`.
1182pub mod now_node {
1183    #[derive(prost_helpers::AnyPB)]
1184    #[derive(Clone, PartialEq, ::prost::Oneof)]
1185    pub enum Mode {
1186        #[prost(message, tag = "101")]
1187        UpdateCurrent(super::NowModeUpdateCurrent),
1188        #[prost(message, tag = "102")]
1189        GenerateSeries(super::NowModeGenerateSeries),
1190    }
1191}
1192#[derive(prost_helpers::AnyPB)]
1193#[derive(Clone, PartialEq, ::prost::Message)]
1194pub struct ValuesNode {
1195    #[prost(message, repeated, tag = "1")]
1196    pub tuples: ::prost::alloc::vec::Vec<values_node::ExprTuple>,
1197    #[prost(message, repeated, tag = "2")]
1198    pub fields: ::prost::alloc::vec::Vec<super::plan_common::Field>,
1199}
1200/// Nested message and enum types in `ValuesNode`.
1201pub mod values_node {
1202    #[derive(prost_helpers::AnyPB)]
1203    #[derive(Clone, PartialEq, ::prost::Message)]
1204    pub struct ExprTuple {
1205        #[prost(message, repeated, tag = "1")]
1206        pub cells: ::prost::alloc::vec::Vec<super::super::expr::ExprNode>,
1207    }
1208}
1209#[derive(prost_helpers::AnyPB)]
1210#[derive(Clone, PartialEq, ::prost::Message)]
1211pub struct DedupNode {
1212    #[prost(message, optional, tag = "1")]
1213    pub state_table: ::core::option::Option<super::catalog::Table>,
1214    #[prost(uint32, repeated, tag = "2")]
1215    pub dedup_column_indices: ::prost::alloc::vec::Vec<u32>,
1216}
1217#[derive(prost_helpers::AnyPB)]
1218#[derive(Clone, Copy, PartialEq, ::prost::Message)]
1219pub struct NoOpNode {}
1220#[derive(prost_helpers::AnyPB)]
1221#[derive(Clone, PartialEq, ::prost::Message)]
1222pub struct EowcOverWindowNode {
1223    #[prost(message, repeated, tag = "1")]
1224    pub calls: ::prost::alloc::vec::Vec<super::expr::WindowFunction>,
1225    #[prost(uint32, repeated, tag = "2")]
1226    pub partition_by: ::prost::alloc::vec::Vec<u32>,
1227    /// use `repeated` in case of future extension, now only one column is allowed
1228    #[prost(message, repeated, tag = "3")]
1229    pub order_by: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
1230    #[prost(message, optional, tag = "4")]
1231    pub state_table: ::core::option::Option<super::catalog::Table>,
1232}
1233#[derive(prost_helpers::AnyPB)]
1234#[derive(Clone, PartialEq, ::prost::Message)]
1235pub struct OverWindowNode {
1236    #[prost(message, repeated, tag = "1")]
1237    pub calls: ::prost::alloc::vec::Vec<super::expr::WindowFunction>,
1238    #[prost(uint32, repeated, tag = "2")]
1239    pub partition_by: ::prost::alloc::vec::Vec<u32>,
1240    #[prost(message, repeated, tag = "3")]
1241    pub order_by: ::prost::alloc::vec::Vec<super::common::ColumnOrder>,
1242    #[prost(message, optional, tag = "4")]
1243    pub state_table: ::core::option::Option<super::catalog::Table>,
1244    #[prost(enumeration = "OverWindowCachePolicy", tag = "5")]
1245    pub cache_policy: i32,
1246}
1247#[derive(prost_helpers::AnyPB)]
1248#[derive(Clone, Copy, PartialEq, ::prost::Message)]
1249pub struct LocalApproxPercentileNode {
1250    #[prost(double, tag = "1")]
1251    pub base: f64,
1252    #[prost(uint32, tag = "2")]
1253    pub percentile_index: u32,
1254}
1255#[derive(prost_helpers::AnyPB)]
1256#[derive(Clone, PartialEq, ::prost::Message)]
1257pub struct GlobalApproxPercentileNode {
1258    #[prost(double, tag = "1")]
1259    pub base: f64,
1260    #[prost(double, tag = "2")]
1261    pub quantile: f64,
1262    #[prost(message, optional, tag = "3")]
1263    pub bucket_state_table: ::core::option::Option<super::catalog::Table>,
1264    #[prost(message, optional, tag = "4")]
1265    pub count_state_table: ::core::option::Option<super::catalog::Table>,
1266}
1267#[derive(prost_helpers::AnyPB)]
1268#[derive(Clone, PartialEq, ::prost::Message)]
1269pub struct RowMergeNode {
1270    #[prost(message, optional, tag = "1")]
1271    pub lhs_mapping: ::core::option::Option<super::catalog::ColIndexMapping>,
1272    #[prost(message, optional, tag = "2")]
1273    pub rhs_mapping: ::core::option::Option<super::catalog::ColIndexMapping>,
1274}
1275#[derive(prost_helpers::AnyPB)]
1276#[derive(Clone, PartialEq, ::prost::Message)]
1277pub struct SyncLogStoreNode {
1278    #[prost(message, optional, tag = "1")]
1279    pub log_store_table: ::core::option::Option<super::catalog::Table>,
1280    #[prost(uint32, tag = "2")]
1281    pub pause_duration_ms: u32,
1282    #[prost(uint32, tag = "3")]
1283    pub buffer_size: u32,
1284}
1285#[derive(prost_helpers::AnyPB)]
1286#[derive(Clone, PartialEq, ::prost::Message)]
1287pub struct MaterializedExprsNode {
1288    #[prost(message, repeated, tag = "1")]
1289    pub exprs: ::prost::alloc::vec::Vec<super::expr::ExprNode>,
1290    #[prost(message, optional, tag = "2")]
1291    pub state_table: ::core::option::Option<super::catalog::Table>,
1292    #[prost(uint32, optional, tag = "3")]
1293    pub state_clean_col_idx: ::core::option::Option<u32>,
1294}
1295#[derive(prost_helpers::AnyPB)]
1296#[derive(Clone, PartialEq, ::prost::Message)]
1297pub struct StreamNode {
1298    /// The id for the operator. This is local per mview.
1299    /// TODO: should better be a uint32.
1300    #[prost(uint64, tag = "1")]
1301    pub operator_id: u64,
1302    /// Child node in plan aka. upstream nodes in the streaming DAG
1303    #[prost(message, repeated, tag = "3")]
1304    pub input: ::prost::alloc::vec::Vec<StreamNode>,
1305    #[prost(uint32, repeated, tag = "2")]
1306    pub stream_key: ::prost::alloc::vec::Vec<u32>,
1307    #[prost(bool, tag = "24")]
1308    pub append_only: bool,
1309    #[prost(string, tag = "18")]
1310    pub identity: ::prost::alloc::string::String,
1311    /// The schema of the plan node
1312    #[prost(message, repeated, tag = "19")]
1313    pub fields: ::prost::alloc::vec::Vec<super::plan_common::Field>,
1314    #[prost(
1315        oneof = "stream_node::NodeBody",
1316        tags = "100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 142, 143, 144, 145, 146, 147, 148, 149"
1317    )]
1318    pub node_body: ::core::option::Option<stream_node::NodeBody>,
1319}
1320/// Nested message and enum types in `StreamNode`.
1321pub mod stream_node {
1322    #[derive(prost_helpers::AnyPB)]
1323    #[derive(::enum_as_inner::EnumAsInner, ::strum::Display, ::strum::EnumDiscriminants)]
1324    #[derive(Clone, PartialEq, ::prost::Oneof)]
1325    pub enum NodeBody {
1326        #[prost(message, tag = "100")]
1327        Source(::prost::alloc::boxed::Box<super::SourceNode>),
1328        #[prost(message, tag = "101")]
1329        Project(::prost::alloc::boxed::Box<super::ProjectNode>),
1330        #[prost(message, tag = "102")]
1331        Filter(::prost::alloc::boxed::Box<super::FilterNode>),
1332        #[prost(message, tag = "103")]
1333        Materialize(::prost::alloc::boxed::Box<super::MaterializeNode>),
1334        #[prost(message, tag = "104")]
1335        StatelessSimpleAgg(::prost::alloc::boxed::Box<super::SimpleAggNode>),
1336        #[prost(message, tag = "105")]
1337        SimpleAgg(::prost::alloc::boxed::Box<super::SimpleAggNode>),
1338        #[prost(message, tag = "106")]
1339        HashAgg(::prost::alloc::boxed::Box<super::HashAggNode>),
1340        #[prost(message, tag = "107")]
1341        AppendOnlyTopN(::prost::alloc::boxed::Box<super::TopNNode>),
1342        #[prost(message, tag = "108")]
1343        HashJoin(::prost::alloc::boxed::Box<super::HashJoinNode>),
1344        #[prost(message, tag = "109")]
1345        TopN(::prost::alloc::boxed::Box<super::TopNNode>),
1346        #[prost(message, tag = "110")]
1347        HopWindow(::prost::alloc::boxed::Box<super::HopWindowNode>),
1348        #[prost(message, tag = "111")]
1349        Merge(::prost::alloc::boxed::Box<super::MergeNode>),
1350        #[prost(message, tag = "112")]
1351        Exchange(::prost::alloc::boxed::Box<super::ExchangeNode>),
1352        #[prost(message, tag = "113")]
1353        StreamScan(::prost::alloc::boxed::Box<super::StreamScanNode>),
1354        #[prost(message, tag = "114")]
1355        BatchPlan(::prost::alloc::boxed::Box<super::BatchPlanNode>),
1356        #[prost(message, tag = "115")]
1357        Lookup(::prost::alloc::boxed::Box<super::LookupNode>),
1358        #[prost(message, tag = "116")]
1359        Arrange(::prost::alloc::boxed::Box<super::ArrangeNode>),
1360        #[prost(message, tag = "117")]
1361        LookupUnion(::prost::alloc::boxed::Box<super::LookupUnionNode>),
1362        #[prost(message, tag = "118")]
1363        Union(super::UnionNode),
1364        #[prost(message, tag = "119")]
1365        DeltaIndexJoin(::prost::alloc::boxed::Box<super::DeltaIndexJoinNode>),
1366        #[prost(message, tag = "120")]
1367        Sink(::prost::alloc::boxed::Box<super::SinkNode>),
1368        #[prost(message, tag = "121")]
1369        Expand(::prost::alloc::boxed::Box<super::ExpandNode>),
1370        #[prost(message, tag = "122")]
1371        DynamicFilter(::prost::alloc::boxed::Box<super::DynamicFilterNode>),
1372        #[prost(message, tag = "123")]
1373        ProjectSet(::prost::alloc::boxed::Box<super::ProjectSetNode>),
1374        #[prost(message, tag = "124")]
1375        GroupTopN(::prost::alloc::boxed::Box<super::GroupTopNNode>),
1376        #[prost(message, tag = "125")]
1377        Sort(::prost::alloc::boxed::Box<super::SortNode>),
1378        #[prost(message, tag = "126")]
1379        WatermarkFilter(::prost::alloc::boxed::Box<super::WatermarkFilterNode>),
1380        #[prost(message, tag = "127")]
1381        Dml(::prost::alloc::boxed::Box<super::DmlNode>),
1382        #[prost(message, tag = "128")]
1383        RowIdGen(::prost::alloc::boxed::Box<super::RowIdGenNode>),
1384        #[prost(message, tag = "129")]
1385        Now(::prost::alloc::boxed::Box<super::NowNode>),
1386        #[prost(message, tag = "130")]
1387        AppendOnlyGroupTopN(::prost::alloc::boxed::Box<super::GroupTopNNode>),
1388        #[prost(message, tag = "131")]
1389        TemporalJoin(::prost::alloc::boxed::Box<super::TemporalJoinNode>),
1390        #[prost(message, tag = "132")]
1391        BarrierRecv(::prost::alloc::boxed::Box<super::BarrierRecvNode>),
1392        #[prost(message, tag = "133")]
1393        Values(::prost::alloc::boxed::Box<super::ValuesNode>),
1394        #[prost(message, tag = "134")]
1395        AppendOnlyDedup(::prost::alloc::boxed::Box<super::DedupNode>),
1396        #[prost(message, tag = "135")]
1397        NoOp(super::NoOpNode),
1398        #[prost(message, tag = "136")]
1399        EowcOverWindow(::prost::alloc::boxed::Box<super::EowcOverWindowNode>),
1400        #[prost(message, tag = "137")]
1401        OverWindow(::prost::alloc::boxed::Box<super::OverWindowNode>),
1402        #[prost(message, tag = "138")]
1403        StreamFsFetch(::prost::alloc::boxed::Box<super::StreamFsFetchNode>),
1404        #[prost(message, tag = "139")]
1405        StreamCdcScan(::prost::alloc::boxed::Box<super::StreamCdcScanNode>),
1406        #[prost(message, tag = "140")]
1407        CdcFilter(::prost::alloc::boxed::Box<super::CdcFilterNode>),
1408        #[prost(message, tag = "142")]
1409        SourceBackfill(::prost::alloc::boxed::Box<super::SourceBackfillNode>),
1410        #[prost(message, tag = "143")]
1411        Changelog(::prost::alloc::boxed::Box<super::ChangeLogNode>),
1412        #[prost(message, tag = "144")]
1413        LocalApproxPercentile(
1414            ::prost::alloc::boxed::Box<super::LocalApproxPercentileNode>,
1415        ),
1416        #[prost(message, tag = "145")]
1417        GlobalApproxPercentile(
1418            ::prost::alloc::boxed::Box<super::GlobalApproxPercentileNode>,
1419        ),
1420        #[prost(message, tag = "146")]
1421        RowMerge(::prost::alloc::boxed::Box<super::RowMergeNode>),
1422        #[prost(message, tag = "147")]
1423        AsOfJoin(::prost::alloc::boxed::Box<super::AsOfJoinNode>),
1424        #[prost(message, tag = "148")]
1425        SyncLogStore(::prost::alloc::boxed::Box<super::SyncLogStoreNode>),
1426        #[prost(message, tag = "149")]
1427        MaterializedExprs(::prost::alloc::boxed::Box<super::MaterializedExprsNode>),
1428    }
1429}
1430/// The property of an edge in the fragment graph.
1431/// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details.
1432#[derive(prost_helpers::AnyPB)]
1433#[derive(Clone, PartialEq, ::prost::Message)]
1434pub struct DispatchStrategy {
1435    #[prost(enumeration = "DispatcherType", tag = "1")]
1436    pub r#type: i32,
1437    #[prost(uint32, repeated, tag = "2")]
1438    pub dist_key_indices: ::prost::alloc::vec::Vec<u32>,
1439    #[prost(uint32, repeated, tag = "3")]
1440    pub output_indices: ::prost::alloc::vec::Vec<u32>,
1441}
1442/// A dispatcher redistribute messages.
1443/// We encode both the type and other usage information in the proto.
1444#[derive(prost_helpers::AnyPB)]
1445#[derive(Clone, PartialEq, ::prost::Message)]
1446pub struct Dispatcher {
1447    #[prost(enumeration = "DispatcherType", tag = "1")]
1448    pub r#type: i32,
1449    /// Indices of the columns to be used for hashing.
1450    /// For dispatcher types other than HASH, this is ignored.
1451    #[prost(uint32, repeated, tag = "2")]
1452    pub dist_key_indices: ::prost::alloc::vec::Vec<u32>,
1453    /// Indices of the columns to output.
1454    /// In most cases, this contains all columns in the input. But for some cases like MV on MV or
1455    /// schema change, we may only output a subset of the columns.
1456    #[prost(uint32, repeated, tag = "6")]
1457    pub output_indices: ::prost::alloc::vec::Vec<u32>,
1458    /// The hash mapping for consistent hash.
1459    /// For dispatcher types other than HASH, this is ignored.
1460    #[prost(message, optional, tag = "3")]
1461    pub hash_mapping: ::core::option::Option<ActorMapping>,
1462    /// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
1463    /// This is exactly the same as its downstream fragment id.
1464    #[prost(uint64, tag = "4")]
1465    pub dispatcher_id: u64,
1466    /// Number of downstreams decides how many endpoints a dispatcher should dispatch.
1467    #[prost(uint32, repeated, tag = "5")]
1468    pub downstream_actor_id: ::prost::alloc::vec::Vec<u32>,
1469}
1470/// A StreamActor is a running fragment of the overall stream graph,
1471#[derive(prost_helpers::AnyPB)]
1472#[derive(Clone, PartialEq, ::prost::Message)]
1473pub struct StreamActor {
1474    #[prost(uint32, tag = "1")]
1475    pub actor_id: u32,
1476    #[prost(uint32, tag = "2")]
1477    pub fragment_id: u32,
1478    #[prost(message, repeated, tag = "4")]
1479    pub dispatcher: ::prost::alloc::vec::Vec<Dispatcher>,
1480    /// Vnodes that the executors in this actor own.
1481    /// If the fragment is a singleton, this field will not be set and leave a `None`.
1482    #[prost(message, optional, tag = "8")]
1483    pub vnode_bitmap: ::core::option::Option<super::common::Buffer>,
1484    /// The SQL definition of this materialized view. Used for debugging only.
1485    #[prost(string, tag = "9")]
1486    pub mview_definition: ::prost::alloc::string::String,
1487    /// Provide the necessary context, e.g. session info like time zone, for the actor.
1488    #[prost(message, optional, tag = "10")]
1489    pub expr_context: ::core::option::Option<super::plan_common::ExprContext>,
1490}
1491/// The streaming context associated with a stream plan
1492#[derive(prost_helpers::AnyPB)]
1493#[derive(Clone, PartialEq, ::prost::Message)]
1494pub struct StreamContext {
1495    /// The timezone associated with the streaming plan. Only applies to MV for now.
1496    #[prost(string, tag = "1")]
1497    pub timezone: ::prost::alloc::string::String,
1498}
1499/// Representation of a graph of stream fragments.
1500/// Generated by the fragmenter in the frontend, only used in DDL requests and never persisted.
1501///
1502/// For the persisted form, see `TableFragments`.
1503#[derive(prost_helpers::AnyPB)]
1504#[derive(Clone, PartialEq, ::prost::Message)]
1505pub struct StreamFragmentGraph {
1506    /// all the fragments in the graph.
1507    #[prost(map = "uint32, message", tag = "1")]
1508    pub fragments: ::std::collections::HashMap<
1509        u32,
1510        stream_fragment_graph::StreamFragment,
1511    >,
1512    /// edges between fragments.
1513    #[prost(message, repeated, tag = "2")]
1514    pub edges: ::prost::alloc::vec::Vec<stream_fragment_graph::StreamFragmentEdge>,
1515    #[prost(uint32, repeated, tag = "3")]
1516    pub dependent_table_ids: ::prost::alloc::vec::Vec<u32>,
1517    #[prost(uint32, tag = "4")]
1518    pub table_ids_cnt: u32,
1519    #[prost(message, optional, tag = "5")]
1520    pub ctx: ::core::option::Option<StreamContext>,
1521    /// If none, default parallelism will be applied.
1522    #[prost(message, optional, tag = "6")]
1523    pub parallelism: ::core::option::Option<stream_fragment_graph::Parallelism>,
1524    /// Specified max parallelism, i.e., expected vnode count for the graph.
1525    ///
1526    /// The scheduler on the meta service will use this as a hint to decide the vnode count
1527    /// for each fragment.
1528    ///
1529    /// Note that the actual vnode count may be different from this value.
1530    /// For example, a no-shuffle exchange between current fragment graph and an existing
1531    /// upstream fragment graph requires two fragments to be in the same distribution,
1532    /// thus the same vnode count.
1533    #[prost(uint32, tag = "7")]
1534    pub max_parallelism: u32,
1535}
1536/// Nested message and enum types in `StreamFragmentGraph`.
1537pub mod stream_fragment_graph {
1538    #[derive(prost_helpers::AnyPB)]
1539    #[derive(Clone, PartialEq, ::prost::Message)]
1540    pub struct StreamFragment {
1541        /// 0-based on frontend, and will be rewritten to global id on meta.
1542        #[prost(uint32, tag = "1")]
1543        pub fragment_id: u32,
1544        /// root stream node in this fragment.
1545        #[prost(message, optional, tag = "2")]
1546        pub node: ::core::option::Option<super::StreamNode>,
1547        /// Bitwise-OR of `FragmentTypeFlag`s
1548        #[prost(uint32, tag = "3")]
1549        pub fragment_type_mask: u32,
1550        /// Mark whether this fragment requires exactly one actor.
1551        /// Note: if this is `false`, the fragment may still be a singleton according to the scheduler.
1552        /// One should check `meta.Fragment.distribution_type` for the final result.
1553        #[prost(bool, tag = "4")]
1554        pub requires_singleton: bool,
1555        /// Number of table ids (stateful states) for this fragment.
1556        #[prost(uint32, tag = "5")]
1557        pub table_ids_cnt: u32,
1558        /// Mark the upstream table ids of this fragment, Used for fragments with `StreamScan`s.
1559        #[prost(uint32, repeated, tag = "6")]
1560        pub upstream_table_ids: ::prost::alloc::vec::Vec<u32>,
1561    }
1562    #[derive(prost_helpers::AnyPB)]
1563    #[derive(Clone, PartialEq, ::prost::Message)]
1564    pub struct StreamFragmentEdge {
1565        /// Dispatch strategy for the fragment.
1566        #[prost(message, optional, tag = "1")]
1567        pub dispatch_strategy: ::core::option::Option<super::DispatchStrategy>,
1568        /// A unique identifier of this edge. Generally it should be exchange node's operator id. When
1569        /// rewriting fragments into delta joins or when inserting 1-to-1 exchange, there will be
1570        /// virtual links generated.
1571        #[prost(uint64, tag = "3")]
1572        pub link_id: u64,
1573        #[prost(uint32, tag = "4")]
1574        pub upstream_id: u32,
1575        #[prost(uint32, tag = "5")]
1576        pub downstream_id: u32,
1577    }
1578    #[derive(prost_helpers::AnyPB)]
1579    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
1580    pub struct Parallelism {
1581        #[prost(uint64, tag = "1")]
1582        pub parallelism: u64,
1583    }
1584}
1585#[derive(prost_helpers::AnyPB)]
1586#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1587#[repr(i32)]
1588pub enum SinkLogStoreType {
1589    /// / Default value is the normal in memory log store to be backward compatible with the previously unset value
1590    Unspecified = 0,
1591    KvLogStore = 1,
1592    InMemoryLogStore = 2,
1593}
1594impl SinkLogStoreType {
1595    /// String value of the enum field names used in the ProtoBuf definition.
1596    ///
1597    /// The values are not transformed in any way and thus are considered stable
1598    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
1599    pub fn as_str_name(&self) -> &'static str {
1600        match self {
1601            Self::Unspecified => "SINK_LOG_STORE_TYPE_UNSPECIFIED",
1602            Self::KvLogStore => "SINK_LOG_STORE_TYPE_KV_LOG_STORE",
1603            Self::InMemoryLogStore => "SINK_LOG_STORE_TYPE_IN_MEMORY_LOG_STORE",
1604        }
1605    }
1606    /// Creates an enum from field names used in the ProtoBuf definition.
1607    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
1608        match value {
1609            "SINK_LOG_STORE_TYPE_UNSPECIFIED" => Some(Self::Unspecified),
1610            "SINK_LOG_STORE_TYPE_KV_LOG_STORE" => Some(Self::KvLogStore),
1611            "SINK_LOG_STORE_TYPE_IN_MEMORY_LOG_STORE" => Some(Self::InMemoryLogStore),
1612            _ => None,
1613        }
1614    }
1615}
1616#[derive(prost_helpers::AnyPB)]
1617#[derive(prost_helpers::Version)]
1618#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1619#[repr(i32)]
1620pub enum AggNodeVersion {
1621    Unspecified = 0,
1622    /// <https://github.com/risingwavelabs/risingwave/issues/12140#issuecomment-1776289808>
1623    Issue12140 = 1,
1624    /// <https://github.com/risingwavelabs/risingwave/issues/13465#issuecomment-1821016508>
1625    Issue13465 = 2,
1626}
1627impl AggNodeVersion {
1628    /// String value of the enum field names used in the ProtoBuf definition.
1629    ///
1630    /// The values are not transformed in any way and thus are considered stable
1631    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
1632    pub fn as_str_name(&self) -> &'static str {
1633        match self {
1634            Self::Unspecified => "AGG_NODE_VERSION_UNSPECIFIED",
1635            Self::Issue12140 => "AGG_NODE_VERSION_ISSUE_12140",
1636            Self::Issue13465 => "AGG_NODE_VERSION_ISSUE_13465",
1637        }
1638    }
1639    /// Creates an enum from field names used in the ProtoBuf definition.
1640    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
1641        match value {
1642            "AGG_NODE_VERSION_UNSPECIFIED" => Some(Self::Unspecified),
1643            "AGG_NODE_VERSION_ISSUE_12140" => Some(Self::Issue12140),
1644            "AGG_NODE_VERSION_ISSUE_13465" => Some(Self::Issue13465),
1645            _ => None,
1646        }
1647    }
1648}
1649/// Decides which kind of Executor will be used
1650#[derive(prost_helpers::AnyPB)]
1651#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1652#[repr(i32)]
1653pub enum StreamScanType {
1654    Unspecified = 0,
1655    /// ChainExecutor
1656    Chain = 1,
1657    /// RearrangedChainExecutor
1658    Rearrange = 2,
1659    /// BackfillExecutor
1660    Backfill = 3,
1661    /// ChainExecutor with upstream_only = true
1662    UpstreamOnly = 4,
1663    /// ArrangementBackfillExecutor
1664    ArrangementBackfill = 5,
1665    /// SnapshotBackfillExecutor
1666    SnapshotBackfill = 6,
1667    /// SnapshotBackfillExecutor
1668    CrossDbSnapshotBackfill = 7,
1669}
1670impl StreamScanType {
1671    /// String value of the enum field names used in the ProtoBuf definition.
1672    ///
1673    /// The values are not transformed in any way and thus are considered stable
1674    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
1675    pub fn as_str_name(&self) -> &'static str {
1676        match self {
1677            Self::Unspecified => "STREAM_SCAN_TYPE_UNSPECIFIED",
1678            Self::Chain => "STREAM_SCAN_TYPE_CHAIN",
1679            Self::Rearrange => "STREAM_SCAN_TYPE_REARRANGE",
1680            Self::Backfill => "STREAM_SCAN_TYPE_BACKFILL",
1681            Self::UpstreamOnly => "STREAM_SCAN_TYPE_UPSTREAM_ONLY",
1682            Self::ArrangementBackfill => "STREAM_SCAN_TYPE_ARRANGEMENT_BACKFILL",
1683            Self::SnapshotBackfill => "STREAM_SCAN_TYPE_SNAPSHOT_BACKFILL",
1684            Self::CrossDbSnapshotBackfill => {
1685                "STREAM_SCAN_TYPE_CROSS_DB_SNAPSHOT_BACKFILL"
1686            }
1687        }
1688    }
1689    /// Creates an enum from field names used in the ProtoBuf definition.
1690    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
1691        match value {
1692            "STREAM_SCAN_TYPE_UNSPECIFIED" => Some(Self::Unspecified),
1693            "STREAM_SCAN_TYPE_CHAIN" => Some(Self::Chain),
1694            "STREAM_SCAN_TYPE_REARRANGE" => Some(Self::Rearrange),
1695            "STREAM_SCAN_TYPE_BACKFILL" => Some(Self::Backfill),
1696            "STREAM_SCAN_TYPE_UPSTREAM_ONLY" => Some(Self::UpstreamOnly),
1697            "STREAM_SCAN_TYPE_ARRANGEMENT_BACKFILL" => Some(Self::ArrangementBackfill),
1698            "STREAM_SCAN_TYPE_SNAPSHOT_BACKFILL" => Some(Self::SnapshotBackfill),
1699            "STREAM_SCAN_TYPE_CROSS_DB_SNAPSHOT_BACKFILL" => {
1700                Some(Self::CrossDbSnapshotBackfill)
1701            }
1702            _ => None,
1703        }
1704    }
1705}
1706#[derive(prost_helpers::AnyPB)]
1707#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1708#[repr(i32)]
1709pub enum OverWindowCachePolicy {
1710    Unspecified = 0,
1711    Full = 1,
1712    Recent = 2,
1713    RecentFirstN = 3,
1714    RecentLastN = 4,
1715}
1716impl OverWindowCachePolicy {
1717    /// String value of the enum field names used in the ProtoBuf definition.
1718    ///
1719    /// The values are not transformed in any way and thus are considered stable
1720    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
1721    pub fn as_str_name(&self) -> &'static str {
1722        match self {
1723            Self::Unspecified => "OVER_WINDOW_CACHE_POLICY_UNSPECIFIED",
1724            Self::Full => "OVER_WINDOW_CACHE_POLICY_FULL",
1725            Self::Recent => "OVER_WINDOW_CACHE_POLICY_RECENT",
1726            Self::RecentFirstN => "OVER_WINDOW_CACHE_POLICY_RECENT_FIRST_N",
1727            Self::RecentLastN => "OVER_WINDOW_CACHE_POLICY_RECENT_LAST_N",
1728        }
1729    }
1730    /// Creates an enum from field names used in the ProtoBuf definition.
1731    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
1732        match value {
1733            "OVER_WINDOW_CACHE_POLICY_UNSPECIFIED" => Some(Self::Unspecified),
1734            "OVER_WINDOW_CACHE_POLICY_FULL" => Some(Self::Full),
1735            "OVER_WINDOW_CACHE_POLICY_RECENT" => Some(Self::Recent),
1736            "OVER_WINDOW_CACHE_POLICY_RECENT_FIRST_N" => Some(Self::RecentFirstN),
1737            "OVER_WINDOW_CACHE_POLICY_RECENT_LAST_N" => Some(Self::RecentLastN),
1738            _ => None,
1739        }
1740    }
1741}
1742#[derive(prost_helpers::AnyPB)]
1743#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1744#[repr(i32)]
1745pub enum DispatcherType {
1746    Unspecified = 0,
1747    /// Dispatch by hash key, hashed by consistent hash.
1748    Hash = 1,
1749    /// Broadcast to all downstreams.
1750    ///
1751    /// Note a broadcast cannot be represented as multiple simple dispatchers, since they are
1752    /// different when we update dispatchers during scaling.
1753    Broadcast = 2,
1754    /// Only one downstream.
1755    Simple = 3,
1756    /// A special kind of exchange that doesn't involve shuffle. The upstream actor will be directly
1757    /// piped into the downstream actor, if there are the same number of actors. If number of actors
1758    /// are not the same, should use hash instead. Should be only used when distribution is the same.
1759    NoShuffle = 4,
1760}
1761impl DispatcherType {
1762    /// String value of the enum field names used in the ProtoBuf definition.
1763    ///
1764    /// The values are not transformed in any way and thus are considered stable
1765    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
1766    pub fn as_str_name(&self) -> &'static str {
1767        match self {
1768            Self::Unspecified => "DISPATCHER_TYPE_UNSPECIFIED",
1769            Self::Hash => "DISPATCHER_TYPE_HASH",
1770            Self::Broadcast => "DISPATCHER_TYPE_BROADCAST",
1771            Self::Simple => "DISPATCHER_TYPE_SIMPLE",
1772            Self::NoShuffle => "DISPATCHER_TYPE_NO_SHUFFLE",
1773        }
1774    }
1775    /// Creates an enum from field names used in the ProtoBuf definition.
1776    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
1777        match value {
1778            "DISPATCHER_TYPE_UNSPECIFIED" => Some(Self::Unspecified),
1779            "DISPATCHER_TYPE_HASH" => Some(Self::Hash),
1780            "DISPATCHER_TYPE_BROADCAST" => Some(Self::Broadcast),
1781            "DISPATCHER_TYPE_SIMPLE" => Some(Self::Simple),
1782            "DISPATCHER_TYPE_NO_SHUFFLE" => Some(Self::NoShuffle),
1783            _ => None,
1784        }
1785    }
1786}
1787/// Indicates whether the fragment contains some special kind of nodes.
1788#[derive(prost_helpers::AnyPB)]
1789#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
1790#[repr(i32)]
1791pub enum FragmentTypeFlag {
1792    FragmentUnspecified = 0,
1793    Source = 1,
1794    Mview = 2,
1795    Sink = 4,
1796    /// TODO: Remove this and insert a `BarrierRecv` instead.
1797    Now = 8,
1798    /// Include StreamScan and StreamCdcScan
1799    StreamScan = 16,
1800    BarrierRecv = 32,
1801    Values = 64,
1802    Dml = 128,
1803    CdcFilter = 256,
1804    SourceScan = 1024,
1805    SnapshotBackfillStreamScan = 2048,
1806    /// Note: this flag is not available in old fragments, so only suitable for debugging purpose.
1807    FsFetch = 4096,
1808    CrossDbSnapshotBackfillStreamScan = 8192,
1809}
1810impl FragmentTypeFlag {
1811    /// String value of the enum field names used in the ProtoBuf definition.
1812    ///
1813    /// The values are not transformed in any way and thus are considered stable
1814    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
1815    pub fn as_str_name(&self) -> &'static str {
1816        match self {
1817            Self::FragmentUnspecified => "FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED",
1818            Self::Source => "FRAGMENT_TYPE_FLAG_SOURCE",
1819            Self::Mview => "FRAGMENT_TYPE_FLAG_MVIEW",
1820            Self::Sink => "FRAGMENT_TYPE_FLAG_SINK",
1821            Self::Now => "FRAGMENT_TYPE_FLAG_NOW",
1822            Self::StreamScan => "FRAGMENT_TYPE_FLAG_STREAM_SCAN",
1823            Self::BarrierRecv => "FRAGMENT_TYPE_FLAG_BARRIER_RECV",
1824            Self::Values => "FRAGMENT_TYPE_FLAG_VALUES",
1825            Self::Dml => "FRAGMENT_TYPE_FLAG_DML",
1826            Self::CdcFilter => "FRAGMENT_TYPE_FLAG_CDC_FILTER",
1827            Self::SourceScan => "FRAGMENT_TYPE_FLAG_SOURCE_SCAN",
1828            Self::SnapshotBackfillStreamScan => {
1829                "FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN"
1830            }
1831            Self::FsFetch => "FRAGMENT_TYPE_FLAG_FS_FETCH",
1832            Self::CrossDbSnapshotBackfillStreamScan => {
1833                "FRAGMENT_TYPE_FLAG_CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN"
1834            }
1835        }
1836    }
1837    /// Creates an enum from field names used in the ProtoBuf definition.
1838    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
1839        match value {
1840            "FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED" => Some(Self::FragmentUnspecified),
1841            "FRAGMENT_TYPE_FLAG_SOURCE" => Some(Self::Source),
1842            "FRAGMENT_TYPE_FLAG_MVIEW" => Some(Self::Mview),
1843            "FRAGMENT_TYPE_FLAG_SINK" => Some(Self::Sink),
1844            "FRAGMENT_TYPE_FLAG_NOW" => Some(Self::Now),
1845            "FRAGMENT_TYPE_FLAG_STREAM_SCAN" => Some(Self::StreamScan),
1846            "FRAGMENT_TYPE_FLAG_BARRIER_RECV" => Some(Self::BarrierRecv),
1847            "FRAGMENT_TYPE_FLAG_VALUES" => Some(Self::Values),
1848            "FRAGMENT_TYPE_FLAG_DML" => Some(Self::Dml),
1849            "FRAGMENT_TYPE_FLAG_CDC_FILTER" => Some(Self::CdcFilter),
1850            "FRAGMENT_TYPE_FLAG_SOURCE_SCAN" => Some(Self::SourceScan),
1851            "FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN" => {
1852                Some(Self::SnapshotBackfillStreamScan)
1853            }
1854            "FRAGMENT_TYPE_FLAG_FS_FETCH" => Some(Self::FsFetch),
1855            "FRAGMENT_TYPE_FLAG_CROSS_DB_SNAPSHOT_BACKFILL_STREAM_SCAN" => {
1856                Some(Self::CrossDbSnapshotBackfillStreamScan)
1857            }
1858            _ => None,
1859        }
1860    }
1861}