risingwave_pb/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17// for derived code of `Message`
18#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21// For tonic::transport::Endpoint::connect
22#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25// FIXME: This should be fixed!!! https://github.com/risingwavelabs/risingwave/issues/19906
26#![expect(clippy::large_enum_variant)]
27#![feature(step_trait)]
28
29pub mod id;
30
31use std::str::FromStr;
32
33use event_recovery::RecoveryEvent;
34use plan_common::AdditionalColumn;
35pub use prost::Message;
36use risingwave_error::tonic::ToTonicStatus;
37use thiserror::Error;
38
39use crate::common::WorkerType;
40use crate::ddl_service::streaming_job_resource_type;
41use crate::id::{FragmentId, SourceId, WorkerId};
42use crate::meta::event_log::event_recovery;
43use crate::stream_plan::PbStreamScanType;
44
45#[rustfmt::skip]
46#[cfg_attr(madsim, path = "sim/catalog.rs")]
47pub mod catalog;
48#[rustfmt::skip]
49#[cfg_attr(madsim, path = "sim/common.rs")]
50pub mod common;
51#[rustfmt::skip]
52#[cfg_attr(madsim, path = "sim/compute.rs")]
53pub mod compute;
54#[rustfmt::skip]
55#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
56pub mod cloud_service;
57#[rustfmt::skip]
58#[cfg_attr(madsim, path = "sim/data.rs")]
59pub mod data;
60#[rustfmt::skip]
61#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
62pub mod ddl_service;
63#[rustfmt::skip]
64#[cfg_attr(madsim, path = "sim/expr.rs")]
65pub mod expr;
66#[rustfmt::skip]
67#[cfg_attr(madsim, path = "sim/meta.rs")]
68pub mod meta;
69#[rustfmt::skip]
70#[cfg_attr(madsim, path = "sim/plan_common.rs")]
71pub mod plan_common;
72#[rustfmt::skip]
73#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
74pub mod batch_plan;
75#[rustfmt::skip]
76#[cfg_attr(madsim, path = "sim/task_service.rs")]
77pub mod task_service;
78#[rustfmt::skip]
79#[cfg_attr(madsim, path = "sim/connector_service.rs")]
80pub mod connector_service;
81#[rustfmt::skip]
82#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
83pub mod stream_plan;
84#[rustfmt::skip]
85#[cfg_attr(madsim, path = "sim/stream_service.rs")]
86pub mod stream_service;
87#[rustfmt::skip]
88#[cfg_attr(madsim, path = "sim/hummock.rs")]
89pub mod hummock;
90#[rustfmt::skip]
91#[cfg_attr(madsim, path = "sim/compactor.rs")]
92pub mod compactor;
93#[rustfmt::skip]
94#[cfg_attr(madsim, path = "sim/user.rs")]
95pub mod user;
96#[rustfmt::skip]
97#[cfg_attr(madsim, path = "sim/source.rs")]
98pub mod source;
99#[rustfmt::skip]
100#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
101pub mod monitor_service;
102#[rustfmt::skip]
103#[cfg_attr(madsim, path = "sim/backup_service.rs")]
104pub mod backup_service;
105#[rustfmt::skip]
106#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
107pub mod serverless_backfill_controller;
108#[rustfmt::skip]
109#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
110pub mod frontend_service;
111#[rustfmt::skip]
112#[cfg_attr(madsim, path = "sim/java_binding.rs")]
113pub mod java_binding;
114#[rustfmt::skip]
115#[cfg_attr(madsim, path = "sim/health.rs")]
116pub mod health;
117#[rustfmt::skip]
118#[path = "sim/telemetry.rs"]
119pub mod telemetry;
120#[rustfmt::skip]
121#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
122pub mod iceberg_compaction;
123
124#[rustfmt::skip]
125#[path = "sim/secret.rs"]
126pub mod secret;
127#[rustfmt::skip]
128#[path = "connector_service.serde.rs"]
129pub mod connector_service_serde;
130#[rustfmt::skip]
131#[path = "catalog.serde.rs"]
132pub mod catalog_serde;
133#[rustfmt::skip]
134#[path = "common.serde.rs"]
135pub mod common_serde;
136#[rustfmt::skip]
137#[path = "compute.serde.rs"]
138pub mod compute_serde;
139#[rustfmt::skip]
140#[path = "cloud_service.serde.rs"]
141pub mod cloud_service_serde;
142#[rustfmt::skip]
143#[path = "data.serde.rs"]
144pub mod data_serde;
145#[rustfmt::skip]
146#[path = "ddl_service.serde.rs"]
147pub mod ddl_service_serde;
148#[rustfmt::skip]
149#[path = "expr.serde.rs"]
150pub mod expr_serde;
151#[rustfmt::skip]
152#[path = "meta.serde.rs"]
153pub mod meta_serde;
154#[rustfmt::skip]
155#[path = "plan_common.serde.rs"]
156pub mod plan_common_serde;
157#[rustfmt::skip]
158#[path = "batch_plan.serde.rs"]
159pub mod batch_plan_serde;
160#[rustfmt::skip]
161#[path = "task_service.serde.rs"]
162pub mod task_service_serde;
163#[rustfmt::skip]
164#[path = "stream_plan.serde.rs"]
165pub mod stream_plan_serde;
166#[rustfmt::skip]
167#[path = "stream_service.serde.rs"]
168pub mod stream_service_serde;
169#[rustfmt::skip]
170#[path = "hummock.serde.rs"]
171pub mod hummock_serde;
172#[rustfmt::skip]
173#[path = "compactor.serde.rs"]
174pub mod compactor_serde;
175#[rustfmt::skip]
176#[path = "user.serde.rs"]
177pub mod user_serde;
178#[rustfmt::skip]
179#[path = "source.serde.rs"]
180pub mod source_serde;
181#[rustfmt::skip]
182#[path = "monitor_service.serde.rs"]
183pub mod monitor_service_serde;
184#[rustfmt::skip]
185#[path = "backup_service.serde.rs"]
186pub mod backup_service_serde;
187#[rustfmt::skip]
188#[path = "java_binding.serde.rs"]
189pub mod java_binding_serde;
190#[rustfmt::skip]
191#[path = "telemetry.serde.rs"]
192pub mod telemetry_serde;
193
194#[rustfmt::skip]
195#[path = "secret.serde.rs"]
196pub mod secret_serde;
197#[rustfmt::skip]
198#[path = "serverless_backfill_controller.serde.rs"]
199pub mod serverless_backfill_controller_serde;
200
201#[derive(Clone, PartialEq, Eq, Debug, Error)]
202#[error("field `{0}` not found")]
203pub struct PbFieldNotFound(pub &'static str);
204
205impl From<PbFieldNotFound> for tonic::Status {
206    fn from(e: PbFieldNotFound) -> Self {
207        e.to_status_unnamed(tonic::Code::Internal)
208    }
209}
210
211impl FromStr for crate::expr::table_function::PbType {
212    type Err = ();
213
214    fn from_str(s: &str) -> Result<Self, Self::Err> {
215        Self::from_str_name(&s.to_uppercase()).ok_or(())
216    }
217}
218
219impl FromStr for crate::expr::agg_call::PbKind {
220    type Err = ();
221
222    fn from_str(s: &str) -> Result<Self, Self::Err> {
223        Self::from_str_name(&s.to_uppercase()).ok_or(())
224    }
225}
226
227impl stream_plan::MaterializeNode {
228    pub fn dist_key_indices(&self) -> Vec<u32> {
229        self.get_table()
230            .unwrap()
231            .distribution_key
232            .iter()
233            .map(|i| *i as u32)
234            .collect()
235    }
236
237    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
238        self.get_table()
239            .unwrap()
240            .columns
241            .iter()
242            .map(|c| c.get_column_desc().unwrap().clone())
243            .collect()
244    }
245}
246
247impl stream_plan::StreamScanNode {
248    /// See [`Self::upstream_column_ids`].
249    pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
250        self.upstream_column_ids
251            .iter()
252            .map(|id| {
253                (self.table_desc.as_ref().unwrap().columns.iter())
254                    .find(|c| c.column_id == *id)
255                    .unwrap()
256                    .clone()
257            })
258            .collect()
259    }
260}
261
262impl stream_plan::SourceBackfillNode {
263    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
264        self.columns
265            .iter()
266            .map(|c| c.column_desc.as_ref().unwrap().clone())
267            .collect()
268    }
269}
270
271// Encapsulating the use of parallelism.
272impl common::WorkerNode {
273    pub fn compute_node_parallelism(&self) -> usize {
274        assert_eq!(self.r#type(), WorkerType::ComputeNode);
275        self.property
276            .as_ref()
277            .expect("property should be exist")
278            .parallelism as usize
279    }
280
281    fn compactor_node_parallelism(&self) -> usize {
282        assert_eq!(self.r#type(), WorkerType::Compactor);
283        self.property
284            .as_ref()
285            .expect("property should be exist")
286            .parallelism as usize
287    }
288
289    pub fn parallelism(&self) -> Option<usize> {
290        match self.r#type() {
291            WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
292            WorkerType::Compactor => Some(self.compactor_node_parallelism()),
293            _ => None,
294        }
295    }
296
297    pub fn resource_group(&self) -> Option<String> {
298        self.property
299            .as_ref()
300            .and_then(|p| p.resource_group.clone())
301    }
302}
303
304impl stream_plan::SourceNode {
305    pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
306        Some(
307            self.source_inner
308                .as_ref()?
309                .columns
310                .iter()
311                .map(|c| c.get_column_desc().unwrap().clone())
312                .collect(),
313        )
314    }
315}
316
317impl meta::table_fragments::ActorStatus {
318    pub fn worker_id(&self) -> WorkerId {
319        self.location
320            .as_ref()
321            .expect("actor location should be exist")
322            .worker_node_id
323    }
324}
325
326impl common::WorkerNode {
327    pub fn is_streaming_schedulable(&self) -> bool {
328        let property = self.property.as_ref();
329        property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
330    }
331}
332
333impl common::ActorLocation {
334    pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
335        Some(Self { worker_node_id })
336    }
337}
338
339impl meta::event_log::EventRecovery {
340    pub fn event_type(&self) -> &str {
341        match self.recovery_event.as_ref() {
342            Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
343            Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
344            Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
345            Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
346            Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
347            Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
348            None => "UNKNOWN_RECOVERY_EVENT",
349        }
350    }
351
352    pub fn database_recovery_start(database_id: u32) -> Self {
353        Self {
354            recovery_event: Some(RecoveryEvent::DatabaseStart(
355                event_recovery::DatabaseRecoveryStart { database_id },
356            )),
357        }
358    }
359
360    pub fn database_recovery_failure(database_id: u32) -> Self {
361        Self {
362            recovery_event: Some(RecoveryEvent::DatabaseFailure(
363                event_recovery::DatabaseRecoveryFailure { database_id },
364            )),
365        }
366    }
367
368    pub fn database_recovery_success(database_id: u32) -> Self {
369        Self {
370            recovery_event: Some(RecoveryEvent::DatabaseSuccess(
371                event_recovery::DatabaseRecoverySuccess { database_id },
372            )),
373        }
374    }
375
376    pub fn global_recovery_start(reason: String) -> Self {
377        Self {
378            recovery_event: Some(RecoveryEvent::GlobalStart(
379                event_recovery::GlobalRecoveryStart { reason },
380            )),
381        }
382    }
383
384    pub fn global_recovery_success(
385        reason: String,
386        duration_secs: f32,
387        running_database_ids: Vec<u32>,
388        recovering_database_ids: Vec<u32>,
389    ) -> Self {
390        Self {
391            recovery_event: Some(RecoveryEvent::GlobalSuccess(
392                event_recovery::GlobalRecoverySuccess {
393                    reason,
394                    duration_secs,
395                    running_database_ids,
396                    recovering_database_ids,
397                },
398            )),
399        }
400    }
401
402    pub fn global_recovery_failure(reason: String, error: String) -> Self {
403        Self {
404            recovery_event: Some(RecoveryEvent::GlobalFailure(
405                event_recovery::GlobalRecoveryFailure { reason, error },
406            )),
407        }
408    }
409}
410
411impl stream_plan::StreamNode {
412    /// Find the external stream source info inside the stream node, if any.
413    ///
414    /// Returns `source_id`.
415    pub fn find_stream_source(&self) -> Option<SourceId> {
416        if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
417            self.node_body.as_ref()
418            && let Some(inner) = &source.source_inner
419        {
420            return Some(inner.source_id);
421        }
422
423        for child in &self.input {
424            if let Some(source) = child.find_stream_source() {
425                return Some(source);
426            }
427        }
428
429        None
430    }
431
432    /// Find the external stream source info inside the stream node, if any.
433    ///
434    /// Returns (`source_id`, `upstream_source_fragment_id`).
435    ///
436    /// Note: we must get upstream fragment id from the merge node, not from the fragment's
437    /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only
438    /// one is the upstream source fragment.
439    pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
440        if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
441            self.node_body.as_ref()
442        {
443            if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
444                self.input[0].node_body.as_ref().unwrap()
445            {
446                // Note: avoid using `merge.upstream_actor_id` to prevent misuse.
447                // See comments there for details.
448                return Some((source.upstream_source_id, merge.upstream_fragment_id));
449            } else {
450                unreachable!(
451                    "source backfill must have a merge node as its input: {:?}",
452                    self
453                );
454            }
455        }
456
457        for child in &self.input {
458            if let Some(source) = child.find_source_backfill() {
459                return Some(source);
460            }
461        }
462
463        None
464    }
465}
466impl stream_plan::Dispatcher {
467    pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
468        stream_plan::DispatchStrategy {
469            r#type: self.r#type,
470            dist_key_indices: self.dist_key_indices.clone(),
471            output_mapping: self.output_mapping.clone(),
472        }
473    }
474}
475
476impl stream_plan::DispatchOutputMapping {
477    /// Create a mapping that forwards all columns.
478    pub fn identical(len: usize) -> Self {
479        Self {
480            indices: (0..len as u32).collect(),
481            types: Vec::new(),
482        }
483    }
484
485    /// Create a mapping that forwards columns with given indices, without type conversion.
486    pub fn simple(indices: Vec<u32>) -> Self {
487        Self {
488            indices,
489            types: Vec::new(),
490        }
491    }
492
493    /// Assert that this mapping does not involve type conversion and return the indices.
494    pub fn into_simple_indices(self) -> Vec<u32> {
495        assert!(
496            self.types.is_empty(),
497            "types must be empty for simple mapping"
498        );
499        self.indices
500    }
501}
502
503impl catalog::StreamSourceInfo {
504    /// Refer to [`Self::cdc_source_job`] for details.
505    pub fn is_shared(&self) -> bool {
506        self.cdc_source_job
507    }
508}
509
510impl stream_plan::PbStreamScanType {
511    pub fn is_reschedulable(&self, is_online: bool) -> bool {
512        match self {
513            PbStreamScanType::Unspecified => {
514                unreachable!()
515            }
516            // todo: should this be true?
517            PbStreamScanType::UpstreamOnly => false,
518            PbStreamScanType::ArrangementBackfill => true,
519            PbStreamScanType::CrossDbSnapshotBackfill => true,
520            PbStreamScanType::SnapshotBackfill => !is_online,
521            PbStreamScanType::Chain | PbStreamScanType::Rearrange | PbStreamScanType::Backfill => {
522                false
523            }
524        }
525    }
526}
527
528impl catalog::Sink {
529    // TODO: remove this placeholder
530    // creating table sink does not have an id, so we need a placeholder
531    pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
532
533    pub fn unique_identity(&self) -> String {
534        // TODO: use a more unique name
535        format!("{}", self.id)
536    }
537
538    /// Get `ignore_delete` with backward compatibility.
539    ///
540    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
541    #[allow(deprecated)]
542    pub fn ignore_delete(&self) -> bool {
543        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
544    }
545}
546
547impl stream_plan::SinkDesc {
548    /// Get `ignore_delete` with backward compatibility.
549    ///
550    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
551    #[allow(deprecated)]
552    pub fn ignore_delete(&self) -> bool {
553        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
554    }
555}
556
557impl connector_service::SinkParam {
558    /// Get `ignore_delete` with backward compatibility.
559    ///
560    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
561    #[allow(deprecated)]
562    pub fn ignore_delete(&self) -> bool {
563        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
564    }
565}
566
567impl catalog::Table {
568    /// Get clean watermark column indices with backward compatibility.
569    ///
570    /// Returns the new `clean_watermark_indices` if set, otherwise derives it from the old
571    /// `clean_watermark_index_in_pk` by converting PK index to column index.
572    ///
573    /// Note: a non-empty slice does not imply that the executor **SHOULD** clean this table
574    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
575    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
576    /// Besides, due to historical reasons, this method may return `[pk[0]]` even if the table
577    /// has nothing to do with watermark.
578    #[expect(deprecated)]
579    pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
580        if !self.clean_watermark_indices.is_empty() {
581            // New format: directly return clean_watermark_indices
582            self.clean_watermark_indices.clone()
583        } else if let Some(pk_idx) = self
584            .clean_watermark_index_in_pk
585            // At the very beginning, the watermark index was hard-coded to the first column of the pk.
586            .or_else(|| (!self.pk.is_empty()).then_some(0))
587        {
588            // Old format: convert PK index to column index
589            // The pk_idx is the position in the PK, we need to find the corresponding column index
590            if let Some(col_order) = self.pk.get(pk_idx as usize) {
591                vec![col_order.column_index]
592            } else {
593                if cfg!(debug_assertions) {
594                    panic!("clean_watermark_index_in_pk is out of range: {self:?}");
595                }
596                vec![]
597            }
598        } else {
599            vec![]
600        }
601    }
602
603    /// Convert clean watermark column indices to PK indices and return the minimum.
604    /// Returns None if no clean watermark is configured.
605    ///
606    /// This is a backward-compatible method to replace the deprecated `clean_watermark_index_in_pk` field.
607    ///
608    /// Note: a `Some` return value does not imply that the executor **SHOULD** clean this table
609    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
610    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
611    /// Besides, due to historical reasons, this method may return `Some(pk[0])` even if the table
612    /// has nothing to do with watermark.
613    ///
614    /// TODO: remove this method after totally deprecating `clean_watermark_index_in_pk`.
615    pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
616        let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
617
618        // Convert column indices to PK indices
619        let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
620            .iter()
621            .filter_map(|&col_idx| {
622                self.pk
623                    .iter()
624                    .position(|col_order| col_order.column_index == col_idx)
625            })
626            .collect();
627
628        // Return the minimum PK index
629        clean_watermark_indices_in_pk.iter().min().copied()
630    }
631}
632
633impl std::fmt::Debug for meta::SystemParams {
634    /// Directly formatting `SystemParams` can be inaccurate or leak sensitive information.
635    ///
636    /// Use `SystemParamsReader` instead.
637    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638        f.debug_struct("SystemParams").finish_non_exhaustive()
639    }
640}
641
642// More compact formats for debugging
643
644impl std::fmt::Debug for data::DataType {
645    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646        let data::DataType {
647            precision,
648            scale,
649            interval_type,
650            field_type,
651            field_names,
652            field_ids,
653            type_name,
654            // currently all data types are nullable
655            is_nullable: _,
656        } = self;
657
658        let type_name = data::data_type::TypeName::try_from(*type_name)
659            .map(|t| t.as_str_name())
660            .unwrap_or("Unknown");
661
662        let mut s = f.debug_struct(type_name);
663        if self.precision != 0 {
664            s.field("precision", precision);
665        }
666        if self.scale != 0 {
667            s.field("scale", scale);
668        }
669        if self.interval_type != 0 {
670            s.field("interval_type", interval_type);
671        }
672        if !self.field_type.is_empty() {
673            s.field("field_type", field_type);
674        }
675        if !self.field_names.is_empty() {
676            s.field("field_names", field_names);
677        }
678        if !self.field_ids.is_empty() {
679            s.field("field_ids", field_ids);
680        }
681        s.finish()
682    }
683}
684
685impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
686    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
687        match self {
688            Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
689            Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
690        }
691    }
692}
693
694impl std::fmt::Debug for plan_common::ColumnDesc {
695    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
696        // destruct here to avoid missing new fields in the future.
697        let plan_common::ColumnDesc {
698            column_type,
699            column_id,
700            name,
701            description,
702            additional_column_type,
703            additional_column,
704            generated_or_default_column,
705            version,
706            nullable,
707        } = self;
708
709        let mut s = f.debug_struct("ColumnDesc");
710        if let Some(column_type) = column_type {
711            s.field("column_type", column_type);
712        } else {
713            s.field("column_type", &"Unknown");
714        }
715        s.field("column_id", column_id).field("name", name);
716        if let Some(description) = description {
717            s.field("description", description);
718        }
719        if self.additional_column_type != 0 {
720            s.field("additional_column_type", additional_column_type);
721        }
722        s.field("version", version);
723        if let Some(AdditionalColumn {
724            column_type: Some(column_type),
725        }) = additional_column
726        {
727            // AdditionalColumn { None } means a normal column
728            s.field("additional_column", &column_type);
729        }
730        if let Some(generated_or_default_column) = generated_or_default_column {
731            s.field("generated_or_default_column", &generated_or_default_column);
732        }
733        s.field("nullable", nullable);
734        s.finish()
735    }
736}
737
738impl expr::UserDefinedFunction {
739    pub fn name_in_runtime(&self) -> Option<&str> {
740        if self.version() < expr::UdfExprVersion::NameInRuntime {
741            if self.language == "rust" || self.language == "wasm" {
742                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
743                // is not used any more. The real bound function name should be the same
744                // as `name`.
745                Some(&self.name)
746            } else {
747                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
748                self.identifier.as_deref()
749            }
750        } else {
751            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
752            self.identifier.as_deref()
753        }
754    }
755}
756
757impl expr::UserDefinedFunctionMetadata {
758    pub fn name_in_runtime(&self) -> Option<&str> {
759        if self.version() < expr::UdfExprVersion::NameInRuntime {
760            if self.language == "rust" || self.language == "wasm" {
761                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
762                // is not used any more. And unfortunately, we don't have the original name
763                // in `PbUserDefinedFunctionMetadata`, so we need to extract the name from
764                // the old `identifier` value (e.g. `foo()->int32`).
765                let old_identifier = self
766                    .identifier
767                    .as_ref()
768                    .expect("Rust/WASM UDF must have identifier");
769                Some(
770                    old_identifier
771                        .split_once("(")
772                        .expect("the old identifier must contain `(`")
773                        .0,
774                )
775            } else {
776                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
777                self.identifier.as_deref()
778            }
779        } else {
780            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
781            self.identifier.as_deref()
782        }
783    }
784}
785
786impl streaming_job_resource_type::ResourceType {
787    pub fn resource_group(&self) -> Option<String> {
788        match self {
789            streaming_job_resource_type::ResourceType::Regular(_) => None,
790            streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
791            | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
792                Some(group.clone())
793            }
794        }
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use crate::data::{DataType, data_type};
801    use crate::plan_common::Field;
802    use crate::stream_plan::stream_node::NodeBody;
803
804    #[test]
805    fn test_getter() {
806        let data_type: DataType = DataType {
807            is_nullable: true,
808            ..Default::default()
809        };
810        let field = Field {
811            data_type: Some(data_type),
812            name: "".to_owned(),
813        };
814        assert!(field.get_data_type().unwrap().is_nullable);
815    }
816
817    #[test]
818    fn test_enum_getter() {
819        let mut data_type: DataType = DataType::default();
820        data_type.type_name = data_type::TypeName::Double as i32;
821        assert_eq!(
822            data_type::TypeName::Double,
823            data_type.get_type_name().unwrap()
824        );
825    }
826
827    #[test]
828    fn test_enum_unspecified() {
829        let mut data_type: DataType = DataType::default();
830        data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
831        assert!(data_type.get_type_name().is_err());
832    }
833
834    #[test]
835    fn test_primitive_getter() {
836        let data_type: DataType = DataType::default();
837        let new_data_type = DataType {
838            is_nullable: data_type.get_is_nullable(),
839            ..Default::default()
840        };
841        assert!(!new_data_type.is_nullable);
842    }
843
844    #[test]
845    fn test_size() {
846        use static_assertions::const_assert_eq;
847        // box all fields in NodeBody to avoid large_enum_variant
848        // see https://github.com/risingwavelabs/risingwave/issues/19910
849        const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
850    }
851
852    #[test]
853    #[expect(deprecated)]
854    fn test_get_clean_watermark_index_in_pk_compat() {
855        use crate::catalog::Table;
856        use crate::common::{ColumnOrder, OrderType};
857
858        fn create_column_order(column_index: u32) -> ColumnOrder {
859            ColumnOrder {
860                column_index,
861                order_type: Some(OrderType::default()),
862            }
863        }
864
865        // Test case 1: both fields are set
866        let table = Table {
867            clean_watermark_indices: vec![3, 2],
868            clean_watermark_index_in_pk: Some(0),
869            pk: vec![
870                create_column_order(1),
871                create_column_order(2),
872                create_column_order(3),
873                create_column_order(4),
874            ],
875            ..Default::default()
876        };
877        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
878
879        // Test case 2: only old field is set
880        let table = Table {
881            clean_watermark_indices: vec![],
882            clean_watermark_index_in_pk: Some(1),
883            pk: vec![create_column_order(0), create_column_order(2)],
884            ..Default::default()
885        };
886        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
887
888        // Test case 3: no clean watermark configured
889        let table = Table {
890            clean_watermark_indices: vec![],
891            clean_watermark_index_in_pk: None,
892            ..Default::default()
893        };
894        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
895    }
896}