risingwave_pb/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17// for derived code of `Message`
18#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21// For tonic::transport::Endpoint::connect
22#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25// FIXME: This should be fixed!!! https://github.com/risingwavelabs/risingwave/issues/19906
26#![expect(clippy::large_enum_variant)]
27
28pub mod id;
29
30use std::str::FromStr;
31
32use event_recovery::RecoveryEvent;
33use plan_common::AdditionalColumn;
34pub use prost::Message;
35use risingwave_error::tonic::ToTonicStatus;
36use thiserror::Error;
37
38use crate::common::WorkerType;
39use crate::id::{FragmentId, SourceId, WorkerId};
40use crate::meta::event_log::event_recovery;
41use crate::stream_plan::PbStreamScanType;
42
43#[rustfmt::skip]
44#[cfg_attr(madsim, path = "sim/catalog.rs")]
45pub mod catalog;
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/common.rs")]
48pub mod common;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/compute.rs")]
51pub mod compute;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
54pub mod cloud_service;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/data.rs")]
57pub mod data;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
60pub mod ddl_service;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/expr.rs")]
63pub mod expr;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/meta.rs")]
66pub mod meta;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/plan_common.rs")]
69pub mod plan_common;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
72pub mod batch_plan;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/task_service.rs")]
75pub mod task_service;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/connector_service.rs")]
78pub mod connector_service;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
81pub mod stream_plan;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/stream_service.rs")]
84pub mod stream_service;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/hummock.rs")]
87pub mod hummock;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/compactor.rs")]
90pub mod compactor;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/user.rs")]
93pub mod user;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/source.rs")]
96pub mod source;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
99pub mod monitor_service;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/backup_service.rs")]
102pub mod backup_service;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
105pub mod serverless_backfill_controller;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
108pub mod frontend_service;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/java_binding.rs")]
111pub mod java_binding;
112#[rustfmt::skip]
113#[cfg_attr(madsim, path = "sim/health.rs")]
114pub mod health;
115#[rustfmt::skip]
116#[path = "sim/telemetry.rs"]
117pub mod telemetry;
118#[rustfmt::skip]
119#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
120pub mod iceberg_compaction;
121
122#[rustfmt::skip]
123#[path = "sim/secret.rs"]
124pub mod secret;
125#[rustfmt::skip]
126#[path = "connector_service.serde.rs"]
127pub mod connector_service_serde;
128#[rustfmt::skip]
129#[path = "catalog.serde.rs"]
130pub mod catalog_serde;
131#[rustfmt::skip]
132#[path = "common.serde.rs"]
133pub mod common_serde;
134#[rustfmt::skip]
135#[path = "compute.serde.rs"]
136pub mod compute_serde;
137#[rustfmt::skip]
138#[path = "cloud_service.serde.rs"]
139pub mod cloud_service_serde;
140#[rustfmt::skip]
141#[path = "data.serde.rs"]
142pub mod data_serde;
143#[rustfmt::skip]
144#[path = "ddl_service.serde.rs"]
145pub mod ddl_service_serde;
146#[rustfmt::skip]
147#[path = "expr.serde.rs"]
148pub mod expr_serde;
149#[rustfmt::skip]
150#[path = "meta.serde.rs"]
151pub mod meta_serde;
152#[rustfmt::skip]
153#[path = "plan_common.serde.rs"]
154pub mod plan_common_serde;
155#[rustfmt::skip]
156#[path = "batch_plan.serde.rs"]
157pub mod batch_plan_serde;
158#[rustfmt::skip]
159#[path = "task_service.serde.rs"]
160pub mod task_service_serde;
161#[rustfmt::skip]
162#[path = "stream_plan.serde.rs"]
163pub mod stream_plan_serde;
164#[rustfmt::skip]
165#[path = "stream_service.serde.rs"]
166pub mod stream_service_serde;
167#[rustfmt::skip]
168#[path = "hummock.serde.rs"]
169pub mod hummock_serde;
170#[rustfmt::skip]
171#[path = "compactor.serde.rs"]
172pub mod compactor_serde;
173#[rustfmt::skip]
174#[path = "user.serde.rs"]
175pub mod user_serde;
176#[rustfmt::skip]
177#[path = "source.serde.rs"]
178pub mod source_serde;
179#[rustfmt::skip]
180#[path = "monitor_service.serde.rs"]
181pub mod monitor_service_serde;
182#[rustfmt::skip]
183#[path = "backup_service.serde.rs"]
184pub mod backup_service_serde;
185#[rustfmt::skip]
186#[path = "java_binding.serde.rs"]
187pub mod java_binding_serde;
188#[rustfmt::skip]
189#[path = "telemetry.serde.rs"]
190pub mod telemetry_serde;
191
192#[rustfmt::skip]
193#[path = "secret.serde.rs"]
194pub mod secret_serde;
195#[rustfmt::skip]
196#[path = "serverless_backfill_controller.serde.rs"]
197pub mod serverless_backfill_controller_serde;
198
199#[derive(Clone, PartialEq, Eq, Debug, Error)]
200#[error("field `{0}` not found")]
201pub struct PbFieldNotFound(pub &'static str);
202
203impl From<PbFieldNotFound> for tonic::Status {
204    fn from(e: PbFieldNotFound) -> Self {
205        e.to_status_unnamed(tonic::Code::Internal)
206    }
207}
208
209impl FromStr for crate::expr::table_function::PbType {
210    type Err = ();
211
212    fn from_str(s: &str) -> Result<Self, Self::Err> {
213        Self::from_str_name(&s.to_uppercase()).ok_or(())
214    }
215}
216
217impl FromStr for crate::expr::agg_call::PbKind {
218    type Err = ();
219
220    fn from_str(s: &str) -> Result<Self, Self::Err> {
221        Self::from_str_name(&s.to_uppercase()).ok_or(())
222    }
223}
224
225impl stream_plan::MaterializeNode {
226    pub fn dist_key_indices(&self) -> Vec<u32> {
227        self.get_table()
228            .unwrap()
229            .distribution_key
230            .iter()
231            .map(|i| *i as u32)
232            .collect()
233    }
234
235    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
236        self.get_table()
237            .unwrap()
238            .columns
239            .iter()
240            .map(|c| c.get_column_desc().unwrap().clone())
241            .collect()
242    }
243}
244
245impl stream_plan::StreamScanNode {
246    /// See [`Self::upstream_column_ids`].
247    pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
248        self.upstream_column_ids
249            .iter()
250            .map(|id| {
251                (self.table_desc.as_ref().unwrap().columns.iter())
252                    .find(|c| c.column_id == *id)
253                    .unwrap()
254                    .clone()
255            })
256            .collect()
257    }
258}
259
260impl stream_plan::SourceBackfillNode {
261    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
262        self.columns
263            .iter()
264            .map(|c| c.column_desc.as_ref().unwrap().clone())
265            .collect()
266    }
267}
268
269// Encapsulating the use of parallelism.
270impl common::WorkerNode {
271    pub fn compute_node_parallelism(&self) -> usize {
272        assert_eq!(self.r#type(), WorkerType::ComputeNode);
273        self.property
274            .as_ref()
275            .expect("property should be exist")
276            .parallelism as usize
277    }
278
279    fn compactor_node_parallelism(&self) -> usize {
280        assert_eq!(self.r#type(), WorkerType::Compactor);
281        self.property
282            .as_ref()
283            .expect("property should be exist")
284            .parallelism as usize
285    }
286
287    pub fn parallelism(&self) -> Option<usize> {
288        match self.r#type() {
289            WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
290            WorkerType::Compactor => Some(self.compactor_node_parallelism()),
291            _ => None,
292        }
293    }
294
295    pub fn resource_group(&self) -> Option<String> {
296        self.property
297            .as_ref()
298            .and_then(|p| p.resource_group.clone())
299    }
300}
301
302impl stream_plan::SourceNode {
303    pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
304        Some(
305            self.source_inner
306                .as_ref()?
307                .columns
308                .iter()
309                .map(|c| c.get_column_desc().unwrap().clone())
310                .collect(),
311        )
312    }
313}
314
315impl meta::table_fragments::ActorStatus {
316    pub fn worker_id(&self) -> WorkerId {
317        self.location
318            .as_ref()
319            .expect("actor location should be exist")
320            .worker_node_id
321    }
322}
323
324impl common::WorkerNode {
325    pub fn is_streaming_schedulable(&self) -> bool {
326        let property = self.property.as_ref();
327        property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
328    }
329}
330
331impl common::ActorLocation {
332    pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
333        Some(Self { worker_node_id })
334    }
335}
336
337impl meta::event_log::EventRecovery {
338    pub fn event_type(&self) -> &str {
339        match self.recovery_event.as_ref() {
340            Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
341            Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
342            Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
343            Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
344            Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
345            Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
346            None => "UNKNOWN_RECOVERY_EVENT",
347        }
348    }
349
350    pub fn database_recovery_start(database_id: u32) -> Self {
351        Self {
352            recovery_event: Some(RecoveryEvent::DatabaseStart(
353                event_recovery::DatabaseRecoveryStart { database_id },
354            )),
355        }
356    }
357
358    pub fn database_recovery_failure(database_id: u32) -> Self {
359        Self {
360            recovery_event: Some(RecoveryEvent::DatabaseFailure(
361                event_recovery::DatabaseRecoveryFailure { database_id },
362            )),
363        }
364    }
365
366    pub fn database_recovery_success(database_id: u32) -> Self {
367        Self {
368            recovery_event: Some(RecoveryEvent::DatabaseSuccess(
369                event_recovery::DatabaseRecoverySuccess { database_id },
370            )),
371        }
372    }
373
374    pub fn global_recovery_start(reason: String) -> Self {
375        Self {
376            recovery_event: Some(RecoveryEvent::GlobalStart(
377                event_recovery::GlobalRecoveryStart { reason },
378            )),
379        }
380    }
381
382    pub fn global_recovery_success(
383        reason: String,
384        duration_secs: f32,
385        running_database_ids: Vec<u32>,
386        recovering_database_ids: Vec<u32>,
387    ) -> Self {
388        Self {
389            recovery_event: Some(RecoveryEvent::GlobalSuccess(
390                event_recovery::GlobalRecoverySuccess {
391                    reason,
392                    duration_secs,
393                    running_database_ids,
394                    recovering_database_ids,
395                },
396            )),
397        }
398    }
399
400    pub fn global_recovery_failure(reason: String, error: String) -> Self {
401        Self {
402            recovery_event: Some(RecoveryEvent::GlobalFailure(
403                event_recovery::GlobalRecoveryFailure { reason, error },
404            )),
405        }
406    }
407}
408
409impl stream_plan::StreamNode {
410    /// Find the external stream source info inside the stream node, if any.
411    ///
412    /// Returns `source_id`.
413    pub fn find_stream_source(&self) -> Option<SourceId> {
414        if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
415            self.node_body.as_ref()
416            && let Some(inner) = &source.source_inner
417        {
418            return Some(inner.source_id);
419        }
420
421        for child in &self.input {
422            if let Some(source) = child.find_stream_source() {
423                return Some(source);
424            }
425        }
426
427        None
428    }
429
430    /// Find the external stream source info inside the stream node, if any.
431    ///
432    /// Returns (`source_id`, `upstream_source_fragment_id`).
433    ///
434    /// Note: we must get upstream fragment id from the merge node, not from the fragment's
435    /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only
436    /// one is the upstream source fragment.
437    pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
438        if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
439            self.node_body.as_ref()
440        {
441            if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
442                self.input[0].node_body.as_ref().unwrap()
443            {
444                // Note: avoid using `merge.upstream_actor_id` to prevent misuse.
445                // See comments there for details.
446                return Some((source.upstream_source_id, merge.upstream_fragment_id));
447            } else {
448                unreachable!(
449                    "source backfill must have a merge node as its input: {:?}",
450                    self
451                );
452            }
453        }
454
455        for child in &self.input {
456            if let Some(source) = child.find_source_backfill() {
457                return Some(source);
458            }
459        }
460
461        None
462    }
463}
464impl stream_plan::Dispatcher {
465    pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
466        stream_plan::DispatchStrategy {
467            r#type: self.r#type,
468            dist_key_indices: self.dist_key_indices.clone(),
469            output_mapping: self.output_mapping.clone(),
470        }
471    }
472}
473
474impl stream_plan::DispatchOutputMapping {
475    /// Create a mapping that forwards all columns.
476    pub fn identical(len: usize) -> Self {
477        Self {
478            indices: (0..len as u32).collect(),
479            types: Vec::new(),
480        }
481    }
482
483    /// Create a mapping that forwards columns with given indices, without type conversion.
484    pub fn simple(indices: Vec<u32>) -> Self {
485        Self {
486            indices,
487            types: Vec::new(),
488        }
489    }
490
491    /// Assert that this mapping does not involve type conversion and return the indices.
492    pub fn into_simple_indices(self) -> Vec<u32> {
493        assert!(
494            self.types.is_empty(),
495            "types must be empty for simple mapping"
496        );
497        self.indices
498    }
499}
500
501impl catalog::StreamSourceInfo {
502    /// Refer to [`Self::cdc_source_job`] for details.
503    pub fn is_shared(&self) -> bool {
504        self.cdc_source_job
505    }
506}
507
508impl stream_plan::PbStreamScanType {
509    pub fn is_reschedulable(&self) -> bool {
510        match self {
511            // todo: should this be true?
512            PbStreamScanType::UpstreamOnly => false,
513            PbStreamScanType::ArrangementBackfill => true,
514            PbStreamScanType::CrossDbSnapshotBackfill => true,
515            PbStreamScanType::SnapshotBackfill => true,
516            _ => false,
517        }
518    }
519}
520
521impl catalog::Sink {
522    // TODO: remove this placeholder
523    // creating table sink does not have an id, so we need a placeholder
524    pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
525
526    pub fn unique_identity(&self) -> String {
527        // TODO: use a more unique name
528        format!("{}", self.id)
529    }
530}
531
532impl catalog::Table {
533    /// Get clean watermark column indices with backward compatibility.
534    /// Returns the new `clean_watermark_indices` if set, otherwise derives it from the old
535    /// `clean_watermark_index_in_pk` by converting PK index to column index.
536    #[expect(deprecated)]
537    pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
538        if !self.clean_watermark_indices.is_empty() {
539            // New format: directly return clean_watermark_indices
540            self.clean_watermark_indices.clone()
541        } else if let Some(pk_idx) = self
542            .clean_watermark_index_in_pk
543            // At the very beginning, the watermark index was hard-coded to the first column of the pk.
544            .or_else(|| (!self.pk.is_empty()).then_some(0))
545        {
546            // Old format: convert PK index to column index
547            // The pk_idx is the position in the PK, we need to find the corresponding column index
548            if let Some(col_order) = self.pk.get(pk_idx as usize) {
549                vec![col_order.column_index]
550            } else {
551                if cfg!(debug_assertions) {
552                    panic!("clean_watermark_index_in_pk is out of range: {self:?}");
553                }
554                vec![]
555            }
556        } else {
557            vec![]
558        }
559    }
560
561    /// Convert clean watermark column indices to PK indices and return the minimum.
562    /// Returns None if no clean watermark is configured.
563    /// This is a backward-compatible method to replace the deprecated `clean_watermark_index_in_pk` field.
564    /// TODO: remove this method after totally deprecating `clean_watermark_index_in_pk`.
565    pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
566        let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
567
568        // Convert column indices to PK indices
569        let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
570            .iter()
571            .filter_map(|&col_idx| {
572                self.pk
573                    .iter()
574                    .position(|col_order| col_order.column_index == col_idx)
575            })
576            .collect();
577
578        // Return the minimum PK index
579        clean_watermark_indices_in_pk.iter().min().copied()
580    }
581}
582
583impl std::fmt::Debug for meta::SystemParams {
584    /// Directly formatting `SystemParams` can be inaccurate or leak sensitive information.
585    ///
586    /// Use `SystemParamsReader` instead.
587    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588        f.debug_struct("SystemParams").finish_non_exhaustive()
589    }
590}
591
592// More compact formats for debugging
593
594impl std::fmt::Debug for data::DataType {
595    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596        let data::DataType {
597            precision,
598            scale,
599            interval_type,
600            field_type,
601            field_names,
602            field_ids,
603            type_name,
604            // currently all data types are nullable
605            is_nullable: _,
606        } = self;
607
608        let type_name = data::data_type::TypeName::try_from(*type_name)
609            .map(|t| t.as_str_name())
610            .unwrap_or("Unknown");
611
612        let mut s = f.debug_struct(type_name);
613        if self.precision != 0 {
614            s.field("precision", precision);
615        }
616        if self.scale != 0 {
617            s.field("scale", scale);
618        }
619        if self.interval_type != 0 {
620            s.field("interval_type", interval_type);
621        }
622        if !self.field_type.is_empty() {
623            s.field("field_type", field_type);
624        }
625        if !self.field_names.is_empty() {
626            s.field("field_names", field_names);
627        }
628        if !self.field_ids.is_empty() {
629            s.field("field_ids", field_ids);
630        }
631        s.finish()
632    }
633}
634
635impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
636    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637        match self {
638            Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
639            Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
640        }
641    }
642}
643
644impl std::fmt::Debug for plan_common::ColumnDesc {
645    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646        // destruct here to avoid missing new fields in the future.
647        let plan_common::ColumnDesc {
648            column_type,
649            column_id,
650            name,
651            description,
652            additional_column_type,
653            additional_column,
654            generated_or_default_column,
655            version,
656            nullable,
657        } = self;
658
659        let mut s = f.debug_struct("ColumnDesc");
660        if let Some(column_type) = column_type {
661            s.field("column_type", column_type);
662        } else {
663            s.field("column_type", &"Unknown");
664        }
665        s.field("column_id", column_id).field("name", name);
666        if let Some(description) = description {
667            s.field("description", description);
668        }
669        if self.additional_column_type != 0 {
670            s.field("additional_column_type", additional_column_type);
671        }
672        s.field("version", version);
673        if let Some(AdditionalColumn {
674            column_type: Some(column_type),
675        }) = additional_column
676        {
677            // AdditionalColumn { None } means a normal column
678            s.field("additional_column", &column_type);
679        }
680        if let Some(generated_or_default_column) = generated_or_default_column {
681            s.field("generated_or_default_column", &generated_or_default_column);
682        }
683        s.field("nullable", nullable);
684        s.finish()
685    }
686}
687
688impl expr::UserDefinedFunction {
689    pub fn name_in_runtime(&self) -> Option<&str> {
690        if self.version() < expr::UdfExprVersion::NameInRuntime {
691            if self.language == "rust" || self.language == "wasm" {
692                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
693                // is not used any more. The real bound function name should be the same
694                // as `name`.
695                Some(&self.name)
696            } else {
697                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
698                self.identifier.as_deref()
699            }
700        } else {
701            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
702            self.identifier.as_deref()
703        }
704    }
705}
706
707impl expr::UserDefinedFunctionMetadata {
708    pub fn name_in_runtime(&self) -> Option<&str> {
709        if self.version() < expr::UdfExprVersion::NameInRuntime {
710            if self.language == "rust" || self.language == "wasm" {
711                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
712                // is not used any more. And unfortunately, we don't have the original name
713                // in `PbUserDefinedFunctionMetadata`, so we need to extract the name from
714                // the old `identifier` value (e.g. `foo()->int32`).
715                let old_identifier = self
716                    .identifier
717                    .as_ref()
718                    .expect("Rust/WASM UDF must have identifier");
719                Some(
720                    old_identifier
721                        .split_once("(")
722                        .expect("the old identifier must contain `(`")
723                        .0,
724                )
725            } else {
726                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
727                self.identifier.as_deref()
728            }
729        } else {
730            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
731            self.identifier.as_deref()
732        }
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use crate::data::{DataType, data_type};
739    use crate::plan_common::Field;
740    use crate::stream_plan::stream_node::NodeBody;
741
742    #[test]
743    fn test_getter() {
744        let data_type: DataType = DataType {
745            is_nullable: true,
746            ..Default::default()
747        };
748        let field = Field {
749            data_type: Some(data_type),
750            name: "".to_owned(),
751        };
752        assert!(field.get_data_type().unwrap().is_nullable);
753    }
754
755    #[test]
756    fn test_enum_getter() {
757        let mut data_type: DataType = DataType::default();
758        data_type.type_name = data_type::TypeName::Double as i32;
759        assert_eq!(
760            data_type::TypeName::Double,
761            data_type.get_type_name().unwrap()
762        );
763    }
764
765    #[test]
766    fn test_enum_unspecified() {
767        let mut data_type: DataType = DataType::default();
768        data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
769        assert!(data_type.get_type_name().is_err());
770    }
771
772    #[test]
773    fn test_primitive_getter() {
774        let data_type: DataType = DataType::default();
775        let new_data_type = DataType {
776            is_nullable: data_type.get_is_nullable(),
777            ..Default::default()
778        };
779        assert!(!new_data_type.is_nullable);
780    }
781
782    #[test]
783    fn test_size() {
784        use static_assertions::const_assert_eq;
785        // box all fields in NodeBody to avoid large_enum_variant
786        // see https://github.com/risingwavelabs/risingwave/issues/19910
787        const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
788    }
789
790    #[test]
791    #[expect(deprecated)]
792    fn test_get_clean_watermark_index_in_pk_compat() {
793        use crate::catalog::Table;
794        use crate::common::{ColumnOrder, OrderType};
795
796        fn create_column_order(column_index: u32) -> ColumnOrder {
797            ColumnOrder {
798                column_index,
799                order_type: Some(OrderType::default()),
800            }
801        }
802
803        // Test case 1: both fields are set
804        let table = Table {
805            clean_watermark_indices: vec![3, 2],
806            clean_watermark_index_in_pk: Some(0),
807            pk: vec![
808                create_column_order(1),
809                create_column_order(2),
810                create_column_order(3),
811                create_column_order(4),
812            ],
813            ..Default::default()
814        };
815        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
816
817        // Test case 2: only old field is set
818        let table = Table {
819            clean_watermark_indices: vec![],
820            clean_watermark_index_in_pk: Some(1),
821            pk: vec![create_column_order(0), create_column_order(2)],
822            ..Default::default()
823        };
824        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
825
826        // Test case 3: no clean watermark configured
827        let table = Table {
828            clean_watermark_indices: vec![],
829            clean_watermark_index_in_pk: None,
830            ..Default::default()
831        };
832        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
833    }
834}