risingwave_pb/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![allow(clippy::doc_lazy_continuation)]
18// for derived code of `Message`
19#![expect(clippy::doc_markdown)]
20#![expect(clippy::upper_case_acronyms)]
21#![expect(clippy::needless_lifetimes)]
22// For tonic::transport::Endpoint::connect
23#![expect(clippy::disallowed_methods)]
24#![expect(clippy::enum_variant_names)]
25#![expect(clippy::module_inception)]
26// FIXME: This should be fixed!!! https://github.com/risingwavelabs/risingwave/issues/19906
27#![expect(clippy::large_enum_variant)]
28#![feature(step_trait)]
29
30pub mod id;
31
32use std::str::FromStr;
33
34use event_recovery::RecoveryEvent;
35use plan_common::AdditionalColumn;
36pub use prost::Message;
37use risingwave_error::tonic::ToTonicStatus;
38use thiserror::Error;
39
40use crate::common::WorkerType;
41use crate::ddl_service::streaming_job_resource_type;
42use crate::id::{FragmentId, SourceId, WorkerId};
43use crate::meta::event_log::event_recovery;
44use crate::stream_plan::PbStreamScanType;
45
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/catalog.rs")]
48pub mod catalog;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/common.rs")]
51pub mod common;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/compute.rs")]
54pub mod compute;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
57pub mod cloud_service;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/data.rs")]
60pub mod data;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
63pub mod ddl_service;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/expr.rs")]
66pub mod expr;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/meta.rs")]
69pub mod meta;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/plan_common.rs")]
72pub mod plan_common;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
75pub mod batch_plan;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/task_service.rs")]
78pub mod task_service;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/connector_service.rs")]
81pub mod connector_service;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
84pub mod stream_plan;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/stream_service.rs")]
87pub mod stream_service;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/hummock.rs")]
90pub mod hummock;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/compactor.rs")]
93pub mod compactor;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/user.rs")]
96pub mod user;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/source.rs")]
99pub mod source;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
102pub mod monitor_service;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/backup_service.rs")]
105pub mod backup_service;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
108pub mod serverless_backfill_controller;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
111pub mod frontend_service;
112#[rustfmt::skip]
113#[cfg_attr(madsim, path = "sim/java_binding.rs")]
114pub mod java_binding;
115#[rustfmt::skip]
116#[cfg_attr(madsim, path = "sim/health.rs")]
117pub mod health;
118#[rustfmt::skip]
119#[path = "sim/telemetry.rs"]
120pub mod telemetry;
121#[rustfmt::skip]
122#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
123pub mod iceberg_compaction;
124
125#[rustfmt::skip]
126#[path = "sim/secret.rs"]
127pub mod secret;
128#[rustfmt::skip]
129#[path = "connector_service.serde.rs"]
130pub mod connector_service_serde;
131#[rustfmt::skip]
132#[path = "catalog.serde.rs"]
133pub mod catalog_serde;
134#[rustfmt::skip]
135#[path = "common.serde.rs"]
136pub mod common_serde;
137#[rustfmt::skip]
138#[path = "compute.serde.rs"]
139pub mod compute_serde;
140#[rustfmt::skip]
141#[path = "cloud_service.serde.rs"]
142pub mod cloud_service_serde;
143#[rustfmt::skip]
144#[path = "data.serde.rs"]
145pub mod data_serde;
146#[rustfmt::skip]
147#[path = "ddl_service.serde.rs"]
148pub mod ddl_service_serde;
149#[rustfmt::skip]
150#[path = "expr.serde.rs"]
151pub mod expr_serde;
152#[rustfmt::skip]
153#[path = "meta.serde.rs"]
154pub mod meta_serde;
155#[rustfmt::skip]
156#[path = "plan_common.serde.rs"]
157pub mod plan_common_serde;
158#[rustfmt::skip]
159#[path = "batch_plan.serde.rs"]
160pub mod batch_plan_serde;
161#[rustfmt::skip]
162#[path = "task_service.serde.rs"]
163pub mod task_service_serde;
164#[rustfmt::skip]
165#[path = "stream_plan.serde.rs"]
166pub mod stream_plan_serde;
167#[rustfmt::skip]
168#[path = "stream_service.serde.rs"]
169pub mod stream_service_serde;
170#[rustfmt::skip]
171#[path = "hummock.serde.rs"]
172pub mod hummock_serde;
173#[rustfmt::skip]
174#[path = "compactor.serde.rs"]
175pub mod compactor_serde;
176#[rustfmt::skip]
177#[path = "user.serde.rs"]
178pub mod user_serde;
179#[rustfmt::skip]
180#[path = "source.serde.rs"]
181pub mod source_serde;
182#[rustfmt::skip]
183#[path = "monitor_service.serde.rs"]
184pub mod monitor_service_serde;
185#[rustfmt::skip]
186#[path = "backup_service.serde.rs"]
187pub mod backup_service_serde;
188#[rustfmt::skip]
189#[path = "java_binding.serde.rs"]
190pub mod java_binding_serde;
191#[rustfmt::skip]
192#[path = "telemetry.serde.rs"]
193pub mod telemetry_serde;
194
195#[rustfmt::skip]
196#[path = "secret.serde.rs"]
197pub mod secret_serde;
198#[rustfmt::skip]
199#[path = "serverless_backfill_controller.serde.rs"]
200pub mod serverless_backfill_controller_serde;
201
202#[derive(Clone, PartialEq, Eq, Debug, Error)]
203#[error("field `{0}` not found")]
204pub struct PbFieldNotFound(pub &'static str);
205
206impl From<PbFieldNotFound> for tonic::Status {
207    fn from(e: PbFieldNotFound) -> Self {
208        e.to_status_unnamed(tonic::Code::Internal)
209    }
210}
211
212impl FromStr for crate::expr::table_function::PbType {
213    type Err = ();
214
215    fn from_str(s: &str) -> Result<Self, Self::Err> {
216        Self::from_str_name(&s.to_uppercase()).ok_or(())
217    }
218}
219
220impl FromStr for crate::expr::agg_call::PbKind {
221    type Err = ();
222
223    fn from_str(s: &str) -> Result<Self, Self::Err> {
224        Self::from_str_name(&s.to_uppercase()).ok_or(())
225    }
226}
227
228impl stream_plan::MaterializeNode {
229    pub fn dist_key_indices(&self) -> Vec<u32> {
230        self.get_table()
231            .unwrap()
232            .distribution_key
233            .iter()
234            .map(|i| *i as u32)
235            .collect()
236    }
237
238    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
239        self.get_table()
240            .unwrap()
241            .columns
242            .iter()
243            .map(|c| c.get_column_desc().unwrap().clone())
244            .collect()
245    }
246}
247
248impl stream_plan::StreamScanNode {
249    /// See [`Self::upstream_column_ids`].
250    pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
251        self.upstream_column_ids
252            .iter()
253            .map(|id| {
254                (self.table_desc.as_ref().unwrap().columns.iter())
255                    .find(|c| c.column_id == *id)
256                    .unwrap()
257                    .clone()
258            })
259            .collect()
260    }
261}
262
263impl stream_plan::SourceBackfillNode {
264    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
265        self.columns
266            .iter()
267            .map(|c| c.column_desc.as_ref().unwrap().clone())
268            .collect()
269    }
270}
271
272// Encapsulating the use of parallelism.
273impl common::WorkerNode {
274    pub fn compute_node_parallelism(&self) -> usize {
275        assert_eq!(self.r#type(), WorkerType::ComputeNode);
276        self.property
277            .as_ref()
278            .expect("property should be exist")
279            .parallelism as usize
280    }
281
282    fn compactor_node_parallelism(&self) -> usize {
283        assert_eq!(self.r#type(), WorkerType::Compactor);
284        self.property
285            .as_ref()
286            .expect("property should be exist")
287            .parallelism as usize
288    }
289
290    pub fn parallelism(&self) -> Option<usize> {
291        match self.r#type() {
292            WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
293            WorkerType::Compactor => Some(self.compactor_node_parallelism()),
294            _ => None,
295        }
296    }
297
298    pub fn resource_group(&self) -> Option<String> {
299        self.property
300            .as_ref()
301            .and_then(|p| p.resource_group.clone())
302    }
303}
304
305impl stream_plan::SourceNode {
306    pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
307        Some(
308            self.source_inner
309                .as_ref()?
310                .columns
311                .iter()
312                .map(|c| c.get_column_desc().unwrap().clone())
313                .collect(),
314        )
315    }
316}
317
318impl meta::table_fragments::ActorStatus {
319    pub fn worker_id(&self) -> WorkerId {
320        self.location
321            .as_ref()
322            .expect("actor location should be exist")
323            .worker_node_id
324    }
325}
326
327impl common::ActorLocation {
328    pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
329        Some(Self { worker_node_id })
330    }
331}
332
333impl meta::event_log::EventRecovery {
334    pub fn event_type(&self) -> &str {
335        match self.recovery_event.as_ref() {
336            Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
337            Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
338            Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
339            Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
340            Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
341            Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
342            None => "UNKNOWN_RECOVERY_EVENT",
343        }
344    }
345
346    pub fn database_recovery_start(database_id: u32) -> Self {
347        Self {
348            recovery_event: Some(RecoveryEvent::DatabaseStart(
349                event_recovery::DatabaseRecoveryStart { database_id },
350            )),
351        }
352    }
353
354    pub fn database_recovery_failure(database_id: u32) -> Self {
355        Self {
356            recovery_event: Some(RecoveryEvent::DatabaseFailure(
357                event_recovery::DatabaseRecoveryFailure { database_id },
358            )),
359        }
360    }
361
362    pub fn database_recovery_success(database_id: u32) -> Self {
363        Self {
364            recovery_event: Some(RecoveryEvent::DatabaseSuccess(
365                event_recovery::DatabaseRecoverySuccess { database_id },
366            )),
367        }
368    }
369
370    pub fn global_recovery_start(reason: String) -> Self {
371        Self {
372            recovery_event: Some(RecoveryEvent::GlobalStart(
373                event_recovery::GlobalRecoveryStart { reason },
374            )),
375        }
376    }
377
378    pub fn global_recovery_success(
379        reason: String,
380        duration_secs: f32,
381        running_database_ids: Vec<u32>,
382        recovering_database_ids: Vec<u32>,
383    ) -> Self {
384        Self {
385            recovery_event: Some(RecoveryEvent::GlobalSuccess(
386                event_recovery::GlobalRecoverySuccess {
387                    reason,
388                    duration_secs,
389                    running_database_ids,
390                    recovering_database_ids,
391                },
392            )),
393        }
394    }
395
396    pub fn global_recovery_failure(reason: String, error: String) -> Self {
397        Self {
398            recovery_event: Some(RecoveryEvent::GlobalFailure(
399                event_recovery::GlobalRecoveryFailure { reason, error },
400            )),
401        }
402    }
403}
404
405impl stream_plan::StreamNode {
406    /// Find the external stream source info inside the stream node, if any.
407    ///
408    /// Returns `source_id`.
409    pub fn find_stream_source(&self) -> Option<SourceId> {
410        if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
411            self.node_body.as_ref()
412            && let Some(inner) = &source.source_inner
413        {
414            return Some(inner.source_id);
415        }
416
417        for child in &self.input {
418            if let Some(source) = child.find_stream_source() {
419                return Some(source);
420            }
421        }
422
423        None
424    }
425
426    /// Find the external stream source info inside the stream node, if any.
427    ///
428    /// Returns (`source_id`, `upstream_source_fragment_id`).
429    ///
430    /// Note: we must get upstream fragment id from the merge node, not from the fragment's
431    /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only
432    /// one is the upstream source fragment.
433    pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
434        if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
435            self.node_body.as_ref()
436        {
437            if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
438                self.input[0].node_body.as_ref().unwrap()
439            {
440                // Note: avoid using `merge.upstream_actor_id` to prevent misuse.
441                // See comments there for details.
442                return Some((source.upstream_source_id, merge.upstream_fragment_id));
443            } else {
444                unreachable!(
445                    "source backfill must have a merge node as its input: {:?}",
446                    self
447                );
448            }
449        }
450
451        for child in &self.input {
452            if let Some(source) = child.find_source_backfill() {
453                return Some(source);
454            }
455        }
456
457        None
458    }
459}
460impl stream_plan::Dispatcher {
461    pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
462        stream_plan::DispatchStrategy {
463            r#type: self.r#type,
464            dist_key_indices: self.dist_key_indices.clone(),
465            output_mapping: self.output_mapping.clone(),
466        }
467    }
468}
469
470impl stream_plan::DispatchOutputMapping {
471    /// Create a mapping that forwards all columns.
472    pub fn identical(len: usize) -> Self {
473        Self {
474            indices: (0..len as u32).collect(),
475            types: Vec::new(),
476        }
477    }
478
479    /// Create a mapping that forwards columns with given indices, without type conversion.
480    pub fn simple(indices: Vec<u32>) -> Self {
481        Self {
482            indices,
483            types: Vec::new(),
484        }
485    }
486
487    /// Assert that this mapping does not involve type conversion and return the indices.
488    pub fn into_simple_indices(self) -> Vec<u32> {
489        assert!(
490            self.types.is_empty(),
491            "types must be empty for simple mapping"
492        );
493        self.indices
494    }
495}
496
497impl catalog::StreamSourceInfo {
498    /// Refer to [`Self::cdc_source_job`] for details.
499    pub fn is_shared(&self) -> bool {
500        self.cdc_source_job
501    }
502}
503
504impl stream_plan::PbStreamScanType {
505    pub fn is_reschedulable(&self, is_online: bool) -> bool {
506        match self {
507            PbStreamScanType::Unspecified => {
508                unreachable!()
509            }
510            // todo: should this be true?
511            PbStreamScanType::UpstreamOnly => false,
512            PbStreamScanType::ArrangementBackfill => true,
513            PbStreamScanType::CrossDbSnapshotBackfill => true,
514            PbStreamScanType::SnapshotBackfill => !is_online,
515            PbStreamScanType::Chain | PbStreamScanType::Rearrange | PbStreamScanType::Backfill => {
516                false
517            }
518        }
519    }
520}
521
522impl catalog::Sink {
523    // TODO: remove this placeholder
524    // creating table sink does not have an id, so we need a placeholder
525    pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
526
527    pub fn unique_identity(&self) -> String {
528        // TODO: use a more unique name
529        format!("{}", self.id)
530    }
531
532    /// Get `ignore_delete` with backward compatibility.
533    ///
534    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
535    #[allow(deprecated)]
536    pub fn ignore_delete(&self) -> bool {
537        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
538    }
539}
540
541impl stream_plan::SinkDesc {
542    /// Get `ignore_delete` with backward compatibility.
543    ///
544    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
545    #[allow(deprecated)]
546    pub fn ignore_delete(&self) -> bool {
547        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
548    }
549}
550
551impl connector_service::SinkParam {
552    /// Get `ignore_delete` with backward compatibility.
553    ///
554    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
555    #[allow(deprecated)]
556    pub fn ignore_delete(&self) -> bool {
557        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
558    }
559}
560
561impl catalog::Table {
562    /// Get clean watermark column indices with backward compatibility.
563    ///
564    /// Returns the new `clean_watermark_indices` if set, otherwise derives it from the old
565    /// `clean_watermark_index_in_pk` by converting PK index to column index.
566    ///
567    /// Note: a non-empty slice does not imply that the executor **SHOULD** clean this table
568    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
569    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
570    /// Besides, due to historical reasons, this method may return `[pk[0]]` even if the table
571    /// has nothing to do with watermark.
572    #[expect(deprecated)]
573    pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
574        if !self.clean_watermark_indices.is_empty() {
575            // New format: directly return clean_watermark_indices
576            self.clean_watermark_indices.clone()
577        } else if let Some(pk_idx) = self
578            .clean_watermark_index_in_pk
579            // At the very beginning, the watermark index was hard-coded to the first column of the pk.
580            .or_else(|| (!self.pk.is_empty()).then_some(0))
581        {
582            // Old format: convert PK index to column index
583            // The pk_idx is the position in the PK, we need to find the corresponding column index
584            if let Some(col_order) = self.pk.get(pk_idx as usize) {
585                vec![col_order.column_index]
586            } else {
587                if cfg!(debug_assertions) {
588                    panic!("clean_watermark_index_in_pk is out of range: {self:?}");
589                }
590                vec![]
591            }
592        } else {
593            vec![]
594        }
595    }
596
597    /// Convert clean watermark column indices to PK indices and return the minimum.
598    /// Returns None if no clean watermark is configured.
599    ///
600    /// This is a backward-compatible method to replace the deprecated `clean_watermark_index_in_pk` field.
601    ///
602    /// Note: a `Some` return value does not imply that the executor **SHOULD** clean this table
603    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
604    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
605    /// Besides, due to historical reasons, this method may return `Some(pk[0])` even if the table
606    /// has nothing to do with watermark.
607    ///
608    /// TODO: remove this method after totally deprecating `clean_watermark_index_in_pk`.
609    pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
610        let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
611
612        // Convert column indices to PK indices
613        let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
614            .iter()
615            .filter_map(|&col_idx| {
616                self.pk
617                    .iter()
618                    .position(|col_order| col_order.column_index == col_idx)
619            })
620            .collect();
621
622        // Return the minimum PK index
623        clean_watermark_indices_in_pk.iter().min().copied()
624    }
625}
626
627impl std::fmt::Debug for meta::SystemParams {
628    /// Directly formatting `SystemParams` can be inaccurate or leak sensitive information.
629    ///
630    /// Use `SystemParamsReader` instead.
631    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632        f.debug_struct("SystemParams").finish_non_exhaustive()
633    }
634}
635
636// More compact formats for debugging
637
638impl std::fmt::Debug for data::DataType {
639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640        let data::DataType {
641            precision,
642            scale,
643            interval_type,
644            field_type,
645            field_names,
646            field_ids,
647            type_name,
648            // currently all data types are nullable
649            is_nullable: _,
650        } = self;
651
652        let type_name = data::data_type::TypeName::try_from(*type_name)
653            .map(|t| t.as_str_name())
654            .unwrap_or("Unknown");
655
656        let mut s = f.debug_struct(type_name);
657        if self.precision != 0 {
658            s.field("precision", precision);
659        }
660        if self.scale != 0 {
661            s.field("scale", scale);
662        }
663        if self.interval_type != 0 {
664            s.field("interval_type", interval_type);
665        }
666        if !self.field_type.is_empty() {
667            s.field("field_type", field_type);
668        }
669        if !self.field_names.is_empty() {
670            s.field("field_names", field_names);
671        }
672        if !self.field_ids.is_empty() {
673            s.field("field_ids", field_ids);
674        }
675        s.finish()
676    }
677}
678
679impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
680    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681        match self {
682            Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
683            Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
684        }
685    }
686}
687
688impl std::fmt::Debug for plan_common::ColumnDesc {
689    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690        // destruct here to avoid missing new fields in the future.
691        let plan_common::ColumnDesc {
692            column_type,
693            column_id,
694            name,
695            description,
696            additional_column_type,
697            additional_column,
698            generated_or_default_column,
699            version,
700            nullable,
701        } = self;
702
703        let mut s = f.debug_struct("ColumnDesc");
704        if let Some(column_type) = column_type {
705            s.field("column_type", column_type);
706        } else {
707            s.field("column_type", &"Unknown");
708        }
709        s.field("column_id", column_id).field("name", name);
710        if let Some(description) = description {
711            s.field("description", description);
712        }
713        if self.additional_column_type != 0 {
714            s.field("additional_column_type", additional_column_type);
715        }
716        s.field("version", version);
717        if let Some(AdditionalColumn {
718            column_type: Some(column_type),
719        }) = additional_column
720        {
721            // AdditionalColumn { None } means a normal column
722            s.field("additional_column", &column_type);
723        }
724        if let Some(generated_or_default_column) = generated_or_default_column {
725            s.field("generated_or_default_column", &generated_or_default_column);
726        }
727        s.field("nullable", nullable);
728        s.finish()
729    }
730}
731
732impl expr::UserDefinedFunction {
733    pub fn name_in_runtime(&self) -> Option<&str> {
734        if self.version() < expr::UdfExprVersion::NameInRuntime {
735            if self.language == "rust" || self.language == "wasm" {
736                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
737                // is not used any more. The real bound function name should be the same
738                // as `name`.
739                Some(&self.name)
740            } else {
741                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
742                self.identifier.as_deref()
743            }
744        } else {
745            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
746            self.identifier.as_deref()
747        }
748    }
749}
750
751impl expr::UserDefinedFunctionMetadata {
752    pub fn name_in_runtime(&self) -> Option<&str> {
753        if self.version() < expr::UdfExprVersion::NameInRuntime {
754            if self.language == "rust" || self.language == "wasm" {
755                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
756                // is not used any more. And unfortunately, we don't have the original name
757                // in `PbUserDefinedFunctionMetadata`, so we need to extract the name from
758                // the old `identifier` value (e.g. `foo()->int32`).
759                let old_identifier = self
760                    .identifier
761                    .as_ref()
762                    .expect("Rust/WASM UDF must have identifier");
763                Some(
764                    old_identifier
765                        .split_once("(")
766                        .expect("the old identifier must contain `(`")
767                        .0,
768                )
769            } else {
770                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
771                self.identifier.as_deref()
772            }
773        } else {
774            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
775            self.identifier.as_deref()
776        }
777    }
778}
779
780impl streaming_job_resource_type::ResourceType {
781    pub fn resource_group(&self) -> Option<String> {
782        match self {
783            streaming_job_resource_type::ResourceType::Regular(_) => None,
784            streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
785            | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
786                Some(group.clone())
787            }
788        }
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use crate::data::{DataType, data_type};
795    use crate::plan_common::Field;
796    use crate::stream_plan::stream_node::NodeBody;
797
798    #[test]
799    fn test_getter() {
800        let data_type: DataType = DataType {
801            is_nullable: true,
802            ..Default::default()
803        };
804        let field = Field {
805            data_type: Some(data_type),
806            name: "".to_owned(),
807        };
808        assert!(field.get_data_type().unwrap().is_nullable);
809    }
810
811    #[test]
812    fn test_enum_getter() {
813        let mut data_type: DataType = DataType::default();
814        data_type.type_name = data_type::TypeName::Double as i32;
815        assert_eq!(
816            data_type::TypeName::Double,
817            data_type.get_type_name().unwrap()
818        );
819    }
820
821    #[test]
822    fn test_enum_unspecified() {
823        let mut data_type: DataType = DataType::default();
824        data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
825        assert!(data_type.get_type_name().is_err());
826    }
827
828    #[test]
829    fn test_primitive_getter() {
830        let data_type: DataType = DataType::default();
831        let new_data_type = DataType {
832            is_nullable: data_type.get_is_nullable(),
833            ..Default::default()
834        };
835        assert!(!new_data_type.is_nullable);
836    }
837
838    #[test]
839    fn test_size() {
840        use static_assertions::const_assert_eq;
841        // box all fields in NodeBody to avoid large_enum_variant
842        // see https://github.com/risingwavelabs/risingwave/issues/19910
843        const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
844    }
845
846    #[test]
847    #[expect(deprecated)]
848    fn test_get_clean_watermark_index_in_pk_compat() {
849        use crate::catalog::Table;
850        use crate::common::{ColumnOrder, OrderType};
851
852        fn create_column_order(column_index: u32) -> ColumnOrder {
853            ColumnOrder {
854                column_index,
855                order_type: Some(OrderType::default()),
856            }
857        }
858
859        // Test case 1: both fields are set
860        let table = Table {
861            clean_watermark_indices: vec![3, 2],
862            clean_watermark_index_in_pk: Some(0),
863            pk: vec![
864                create_column_order(1),
865                create_column_order(2),
866                create_column_order(3),
867                create_column_order(4),
868            ],
869            ..Default::default()
870        };
871        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
872
873        // Test case 2: only old field is set
874        let table = Table {
875            clean_watermark_indices: vec![],
876            clean_watermark_index_in_pk: Some(1),
877            pk: vec![create_column_order(0), create_column_order(2)],
878            ..Default::default()
879        };
880        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
881
882        // Test case 3: no clean watermark configured
883        let table = Table {
884            clean_watermark_indices: vec![],
885            clean_watermark_index_in_pk: None,
886            ..Default::default()
887        };
888        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
889    }
890}