risingwave_pb/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17// for derived code of `Message`
18#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21// For tonic::transport::Endpoint::connect
22#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25// FIXME: This should be fixed!!! https://github.com/risingwavelabs/risingwave/issues/19906
26#![expect(clippy::large_enum_variant)]
27
28pub mod id;
29
30use std::str::FromStr;
31
32use event_recovery::RecoveryEvent;
33use plan_common::AdditionalColumn;
34pub use prost::Message;
35use risingwave_error::tonic::ToTonicStatus;
36use thiserror::Error;
37
38use crate::common::WorkerType;
39use crate::ddl_service::streaming_job_resource_type;
40use crate::id::{FragmentId, SourceId, WorkerId};
41use crate::meta::event_log::event_recovery;
42use crate::stream_plan::PbStreamScanType;
43
44#[rustfmt::skip]
45#[cfg_attr(madsim, path = "sim/catalog.rs")]
46pub mod catalog;
47#[rustfmt::skip]
48#[cfg_attr(madsim, path = "sim/common.rs")]
49pub mod common;
50#[rustfmt::skip]
51#[cfg_attr(madsim, path = "sim/compute.rs")]
52pub mod compute;
53#[rustfmt::skip]
54#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
55pub mod cloud_service;
56#[rustfmt::skip]
57#[cfg_attr(madsim, path = "sim/data.rs")]
58pub mod data;
59#[rustfmt::skip]
60#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
61pub mod ddl_service;
62#[rustfmt::skip]
63#[cfg_attr(madsim, path = "sim/expr.rs")]
64pub mod expr;
65#[rustfmt::skip]
66#[cfg_attr(madsim, path = "sim/meta.rs")]
67pub mod meta;
68#[rustfmt::skip]
69#[cfg_attr(madsim, path = "sim/plan_common.rs")]
70pub mod plan_common;
71#[rustfmt::skip]
72#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
73pub mod batch_plan;
74#[rustfmt::skip]
75#[cfg_attr(madsim, path = "sim/task_service.rs")]
76pub mod task_service;
77#[rustfmt::skip]
78#[cfg_attr(madsim, path = "sim/connector_service.rs")]
79pub mod connector_service;
80#[rustfmt::skip]
81#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
82pub mod stream_plan;
83#[rustfmt::skip]
84#[cfg_attr(madsim, path = "sim/stream_service.rs")]
85pub mod stream_service;
86#[rustfmt::skip]
87#[cfg_attr(madsim, path = "sim/hummock.rs")]
88pub mod hummock;
89#[rustfmt::skip]
90#[cfg_attr(madsim, path = "sim/compactor.rs")]
91pub mod compactor;
92#[rustfmt::skip]
93#[cfg_attr(madsim, path = "sim/user.rs")]
94pub mod user;
95#[rustfmt::skip]
96#[cfg_attr(madsim, path = "sim/source.rs")]
97pub mod source;
98#[rustfmt::skip]
99#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
100pub mod monitor_service;
101#[rustfmt::skip]
102#[cfg_attr(madsim, path = "sim/backup_service.rs")]
103pub mod backup_service;
104#[rustfmt::skip]
105#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
106pub mod serverless_backfill_controller;
107#[rustfmt::skip]
108#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
109pub mod frontend_service;
110#[rustfmt::skip]
111#[cfg_attr(madsim, path = "sim/java_binding.rs")]
112pub mod java_binding;
113#[rustfmt::skip]
114#[cfg_attr(madsim, path = "sim/health.rs")]
115pub mod health;
116#[rustfmt::skip]
117#[path = "sim/telemetry.rs"]
118pub mod telemetry;
119#[rustfmt::skip]
120#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
121pub mod iceberg_compaction;
122
123#[rustfmt::skip]
124#[path = "sim/secret.rs"]
125pub mod secret;
126#[rustfmt::skip]
127#[path = "connector_service.serde.rs"]
128pub mod connector_service_serde;
129#[rustfmt::skip]
130#[path = "catalog.serde.rs"]
131pub mod catalog_serde;
132#[rustfmt::skip]
133#[path = "common.serde.rs"]
134pub mod common_serde;
135#[rustfmt::skip]
136#[path = "compute.serde.rs"]
137pub mod compute_serde;
138#[rustfmt::skip]
139#[path = "cloud_service.serde.rs"]
140pub mod cloud_service_serde;
141#[rustfmt::skip]
142#[path = "data.serde.rs"]
143pub mod data_serde;
144#[rustfmt::skip]
145#[path = "ddl_service.serde.rs"]
146pub mod ddl_service_serde;
147#[rustfmt::skip]
148#[path = "expr.serde.rs"]
149pub mod expr_serde;
150#[rustfmt::skip]
151#[path = "meta.serde.rs"]
152pub mod meta_serde;
153#[rustfmt::skip]
154#[path = "plan_common.serde.rs"]
155pub mod plan_common_serde;
156#[rustfmt::skip]
157#[path = "batch_plan.serde.rs"]
158pub mod batch_plan_serde;
159#[rustfmt::skip]
160#[path = "task_service.serde.rs"]
161pub mod task_service_serde;
162#[rustfmt::skip]
163#[path = "stream_plan.serde.rs"]
164pub mod stream_plan_serde;
165#[rustfmt::skip]
166#[path = "stream_service.serde.rs"]
167pub mod stream_service_serde;
168#[rustfmt::skip]
169#[path = "hummock.serde.rs"]
170pub mod hummock_serde;
171#[rustfmt::skip]
172#[path = "compactor.serde.rs"]
173pub mod compactor_serde;
174#[rustfmt::skip]
175#[path = "user.serde.rs"]
176pub mod user_serde;
177#[rustfmt::skip]
178#[path = "source.serde.rs"]
179pub mod source_serde;
180#[rustfmt::skip]
181#[path = "monitor_service.serde.rs"]
182pub mod monitor_service_serde;
183#[rustfmt::skip]
184#[path = "backup_service.serde.rs"]
185pub mod backup_service_serde;
186#[rustfmt::skip]
187#[path = "java_binding.serde.rs"]
188pub mod java_binding_serde;
189#[rustfmt::skip]
190#[path = "telemetry.serde.rs"]
191pub mod telemetry_serde;
192
193#[rustfmt::skip]
194#[path = "secret.serde.rs"]
195pub mod secret_serde;
196#[rustfmt::skip]
197#[path = "serverless_backfill_controller.serde.rs"]
198pub mod serverless_backfill_controller_serde;
199
200#[derive(Clone, PartialEq, Eq, Debug, Error)]
201#[error("field `{0}` not found")]
202pub struct PbFieldNotFound(pub &'static str);
203
204impl From<PbFieldNotFound> for tonic::Status {
205    fn from(e: PbFieldNotFound) -> Self {
206        e.to_status_unnamed(tonic::Code::Internal)
207    }
208}
209
210impl FromStr for crate::expr::table_function::PbType {
211    type Err = ();
212
213    fn from_str(s: &str) -> Result<Self, Self::Err> {
214        Self::from_str_name(&s.to_uppercase()).ok_or(())
215    }
216}
217
218impl FromStr for crate::expr::agg_call::PbKind {
219    type Err = ();
220
221    fn from_str(s: &str) -> Result<Self, Self::Err> {
222        Self::from_str_name(&s.to_uppercase()).ok_or(())
223    }
224}
225
226impl stream_plan::MaterializeNode {
227    pub fn dist_key_indices(&self) -> Vec<u32> {
228        self.get_table()
229            .unwrap()
230            .distribution_key
231            .iter()
232            .map(|i| *i as u32)
233            .collect()
234    }
235
236    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
237        self.get_table()
238            .unwrap()
239            .columns
240            .iter()
241            .map(|c| c.get_column_desc().unwrap().clone())
242            .collect()
243    }
244}
245
246impl stream_plan::StreamScanNode {
247    /// See [`Self::upstream_column_ids`].
248    pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
249        self.upstream_column_ids
250            .iter()
251            .map(|id| {
252                (self.table_desc.as_ref().unwrap().columns.iter())
253                    .find(|c| c.column_id == *id)
254                    .unwrap()
255                    .clone()
256            })
257            .collect()
258    }
259}
260
261impl stream_plan::SourceBackfillNode {
262    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
263        self.columns
264            .iter()
265            .map(|c| c.column_desc.as_ref().unwrap().clone())
266            .collect()
267    }
268}
269
270// Encapsulating the use of parallelism.
271impl common::WorkerNode {
272    pub fn compute_node_parallelism(&self) -> usize {
273        assert_eq!(self.r#type(), WorkerType::ComputeNode);
274        self.property
275            .as_ref()
276            .expect("property should be exist")
277            .parallelism as usize
278    }
279
280    fn compactor_node_parallelism(&self) -> usize {
281        assert_eq!(self.r#type(), WorkerType::Compactor);
282        self.property
283            .as_ref()
284            .expect("property should be exist")
285            .parallelism as usize
286    }
287
288    pub fn parallelism(&self) -> Option<usize> {
289        match self.r#type() {
290            WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
291            WorkerType::Compactor => Some(self.compactor_node_parallelism()),
292            _ => None,
293        }
294    }
295
296    pub fn resource_group(&self) -> Option<String> {
297        self.property
298            .as_ref()
299            .and_then(|p| p.resource_group.clone())
300    }
301}
302
303impl stream_plan::SourceNode {
304    pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
305        Some(
306            self.source_inner
307                .as_ref()?
308                .columns
309                .iter()
310                .map(|c| c.get_column_desc().unwrap().clone())
311                .collect(),
312        )
313    }
314}
315
316impl meta::table_fragments::ActorStatus {
317    pub fn worker_id(&self) -> WorkerId {
318        self.location
319            .as_ref()
320            .expect("actor location should be exist")
321            .worker_node_id
322    }
323}
324
325impl common::WorkerNode {
326    pub fn is_streaming_schedulable(&self) -> bool {
327        let property = self.property.as_ref();
328        property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
329    }
330}
331
332impl common::ActorLocation {
333    pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
334        Some(Self { worker_node_id })
335    }
336}
337
338impl meta::event_log::EventRecovery {
339    pub fn event_type(&self) -> &str {
340        match self.recovery_event.as_ref() {
341            Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
342            Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
343            Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
344            Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
345            Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
346            Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
347            None => "UNKNOWN_RECOVERY_EVENT",
348        }
349    }
350
351    pub fn database_recovery_start(database_id: u32) -> Self {
352        Self {
353            recovery_event: Some(RecoveryEvent::DatabaseStart(
354                event_recovery::DatabaseRecoveryStart { database_id },
355            )),
356        }
357    }
358
359    pub fn database_recovery_failure(database_id: u32) -> Self {
360        Self {
361            recovery_event: Some(RecoveryEvent::DatabaseFailure(
362                event_recovery::DatabaseRecoveryFailure { database_id },
363            )),
364        }
365    }
366
367    pub fn database_recovery_success(database_id: u32) -> Self {
368        Self {
369            recovery_event: Some(RecoveryEvent::DatabaseSuccess(
370                event_recovery::DatabaseRecoverySuccess { database_id },
371            )),
372        }
373    }
374
375    pub fn global_recovery_start(reason: String) -> Self {
376        Self {
377            recovery_event: Some(RecoveryEvent::GlobalStart(
378                event_recovery::GlobalRecoveryStart { reason },
379            )),
380        }
381    }
382
383    pub fn global_recovery_success(
384        reason: String,
385        duration_secs: f32,
386        running_database_ids: Vec<u32>,
387        recovering_database_ids: Vec<u32>,
388    ) -> Self {
389        Self {
390            recovery_event: Some(RecoveryEvent::GlobalSuccess(
391                event_recovery::GlobalRecoverySuccess {
392                    reason,
393                    duration_secs,
394                    running_database_ids,
395                    recovering_database_ids,
396                },
397            )),
398        }
399    }
400
401    pub fn global_recovery_failure(reason: String, error: String) -> Self {
402        Self {
403            recovery_event: Some(RecoveryEvent::GlobalFailure(
404                event_recovery::GlobalRecoveryFailure { reason, error },
405            )),
406        }
407    }
408}
409
410impl stream_plan::StreamNode {
411    /// Find the external stream source info inside the stream node, if any.
412    ///
413    /// Returns `source_id`.
414    pub fn find_stream_source(&self) -> Option<SourceId> {
415        if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
416            self.node_body.as_ref()
417            && let Some(inner) = &source.source_inner
418        {
419            return Some(inner.source_id);
420        }
421
422        for child in &self.input {
423            if let Some(source) = child.find_stream_source() {
424                return Some(source);
425            }
426        }
427
428        None
429    }
430
431    /// Find the external stream source info inside the stream node, if any.
432    ///
433    /// Returns (`source_id`, `upstream_source_fragment_id`).
434    ///
435    /// Note: we must get upstream fragment id from the merge node, not from the fragment's
436    /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only
437    /// one is the upstream source fragment.
438    pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
439        if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
440            self.node_body.as_ref()
441        {
442            if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
443                self.input[0].node_body.as_ref().unwrap()
444            {
445                // Note: avoid using `merge.upstream_actor_id` to prevent misuse.
446                // See comments there for details.
447                return Some((source.upstream_source_id, merge.upstream_fragment_id));
448            } else {
449                unreachable!(
450                    "source backfill must have a merge node as its input: {:?}",
451                    self
452                );
453            }
454        }
455
456        for child in &self.input {
457            if let Some(source) = child.find_source_backfill() {
458                return Some(source);
459            }
460        }
461
462        None
463    }
464}
465impl stream_plan::Dispatcher {
466    pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
467        stream_plan::DispatchStrategy {
468            r#type: self.r#type,
469            dist_key_indices: self.dist_key_indices.clone(),
470            output_mapping: self.output_mapping.clone(),
471        }
472    }
473}
474
475impl stream_plan::DispatchOutputMapping {
476    /// Create a mapping that forwards all columns.
477    pub fn identical(len: usize) -> Self {
478        Self {
479            indices: (0..len as u32).collect(),
480            types: Vec::new(),
481        }
482    }
483
484    /// Create a mapping that forwards columns with given indices, without type conversion.
485    pub fn simple(indices: Vec<u32>) -> Self {
486        Self {
487            indices,
488            types: Vec::new(),
489        }
490    }
491
492    /// Assert that this mapping does not involve type conversion and return the indices.
493    pub fn into_simple_indices(self) -> Vec<u32> {
494        assert!(
495            self.types.is_empty(),
496            "types must be empty for simple mapping"
497        );
498        self.indices
499    }
500}
501
502impl catalog::StreamSourceInfo {
503    /// Refer to [`Self::cdc_source_job`] for details.
504    pub fn is_shared(&self) -> bool {
505        self.cdc_source_job
506    }
507}
508
509impl stream_plan::PbStreamScanType {
510    pub fn is_reschedulable(&self) -> bool {
511        match self {
512            // todo: should this be true?
513            PbStreamScanType::UpstreamOnly => false,
514            PbStreamScanType::ArrangementBackfill => true,
515            PbStreamScanType::CrossDbSnapshotBackfill => true,
516            PbStreamScanType::SnapshotBackfill => true,
517            _ => false,
518        }
519    }
520}
521
522impl catalog::Sink {
523    // TODO: remove this placeholder
524    // creating table sink does not have an id, so we need a placeholder
525    pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
526
527    pub fn unique_identity(&self) -> String {
528        // TODO: use a more unique name
529        format!("{}", self.id)
530    }
531
532    /// Get `ignore_delete` with backward compatibility.
533    ///
534    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
535    #[allow(deprecated)]
536    pub fn ignore_delete(&self) -> bool {
537        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
538    }
539}
540
541impl stream_plan::SinkDesc {
542    /// Get `ignore_delete` with backward compatibility.
543    ///
544    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
545    #[allow(deprecated)]
546    pub fn ignore_delete(&self) -> bool {
547        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
548    }
549}
550
551impl connector_service::SinkParam {
552    /// Get `ignore_delete` with backward compatibility.
553    ///
554    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
555    #[allow(deprecated)]
556    pub fn ignore_delete(&self) -> bool {
557        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
558    }
559}
560
561impl catalog::Table {
562    /// Get clean watermark column indices with backward compatibility.
563    ///
564    /// Returns the new `clean_watermark_indices` if set, otherwise derives it from the old
565    /// `clean_watermark_index_in_pk` by converting PK index to column index.
566    ///
567    /// Note: a non-empty slice does not imply that the executor **SHOULD** clean this table
568    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
569    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
570    /// Besides, due to historical reasons, this method may return `[pk[0]]` even if the table
571    /// has nothing to do with watermark.
572    #[expect(deprecated)]
573    pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
574        if !self.clean_watermark_indices.is_empty() {
575            // New format: directly return clean_watermark_indices
576            self.clean_watermark_indices.clone()
577        } else if let Some(pk_idx) = self
578            .clean_watermark_index_in_pk
579            // At the very beginning, the watermark index was hard-coded to the first column of the pk.
580            .or_else(|| (!self.pk.is_empty()).then_some(0))
581        {
582            // Old format: convert PK index to column index
583            // The pk_idx is the position in the PK, we need to find the corresponding column index
584            if let Some(col_order) = self.pk.get(pk_idx as usize) {
585                vec![col_order.column_index]
586            } else {
587                if cfg!(debug_assertions) {
588                    panic!("clean_watermark_index_in_pk is out of range: {self:?}");
589                }
590                vec![]
591            }
592        } else {
593            vec![]
594        }
595    }
596
597    /// Convert clean watermark column indices to PK indices and return the minimum.
598    /// Returns None if no clean watermark is configured.
599    ///
600    /// This is a backward-compatible method to replace the deprecated `clean_watermark_index_in_pk` field.
601    ///
602    /// Note: a `Some` return value does not imply that the executor **SHOULD** clean this table
603    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
604    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
605    /// Besides, due to historical reasons, this method may return `Some(pk[0])` even if the table
606    /// has nothing to do with watermark.
607    ///
608    /// TODO: remove this method after totally deprecating `clean_watermark_index_in_pk`.
609    pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
610        let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
611
612        // Convert column indices to PK indices
613        let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
614            .iter()
615            .filter_map(|&col_idx| {
616                self.pk
617                    .iter()
618                    .position(|col_order| col_order.column_index == col_idx)
619            })
620            .collect();
621
622        // Return the minimum PK index
623        clean_watermark_indices_in_pk.iter().min().copied()
624    }
625}
626
627impl std::fmt::Debug for meta::SystemParams {
628    /// Directly formatting `SystemParams` can be inaccurate or leak sensitive information.
629    ///
630    /// Use `SystemParamsReader` instead.
631    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632        f.debug_struct("SystemParams").finish_non_exhaustive()
633    }
634}
635
636// More compact formats for debugging
637
638impl std::fmt::Debug for data::DataType {
639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640        let data::DataType {
641            precision,
642            scale,
643            interval_type,
644            field_type,
645            field_names,
646            field_ids,
647            type_name,
648            // currently all data types are nullable
649            is_nullable: _,
650        } = self;
651
652        let type_name = data::data_type::TypeName::try_from(*type_name)
653            .map(|t| t.as_str_name())
654            .unwrap_or("Unknown");
655
656        let mut s = f.debug_struct(type_name);
657        if self.precision != 0 {
658            s.field("precision", precision);
659        }
660        if self.scale != 0 {
661            s.field("scale", scale);
662        }
663        if self.interval_type != 0 {
664            s.field("interval_type", interval_type);
665        }
666        if !self.field_type.is_empty() {
667            s.field("field_type", field_type);
668        }
669        if !self.field_names.is_empty() {
670            s.field("field_names", field_names);
671        }
672        if !self.field_ids.is_empty() {
673            s.field("field_ids", field_ids);
674        }
675        s.finish()
676    }
677}
678
679impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
680    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681        match self {
682            Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
683            Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
684        }
685    }
686}
687
688impl std::fmt::Debug for plan_common::ColumnDesc {
689    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690        // destruct here to avoid missing new fields in the future.
691        let plan_common::ColumnDesc {
692            column_type,
693            column_id,
694            name,
695            description,
696            additional_column_type,
697            additional_column,
698            generated_or_default_column,
699            version,
700            nullable,
701        } = self;
702
703        let mut s = f.debug_struct("ColumnDesc");
704        if let Some(column_type) = column_type {
705            s.field("column_type", column_type);
706        } else {
707            s.field("column_type", &"Unknown");
708        }
709        s.field("column_id", column_id).field("name", name);
710        if let Some(description) = description {
711            s.field("description", description);
712        }
713        if self.additional_column_type != 0 {
714            s.field("additional_column_type", additional_column_type);
715        }
716        s.field("version", version);
717        if let Some(AdditionalColumn {
718            column_type: Some(column_type),
719        }) = additional_column
720        {
721            // AdditionalColumn { None } means a normal column
722            s.field("additional_column", &column_type);
723        }
724        if let Some(generated_or_default_column) = generated_or_default_column {
725            s.field("generated_or_default_column", &generated_or_default_column);
726        }
727        s.field("nullable", nullable);
728        s.finish()
729    }
730}
731
732impl expr::UserDefinedFunction {
733    pub fn name_in_runtime(&self) -> Option<&str> {
734        if self.version() < expr::UdfExprVersion::NameInRuntime {
735            if self.language == "rust" || self.language == "wasm" {
736                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
737                // is not used any more. The real bound function name should be the same
738                // as `name`.
739                Some(&self.name)
740            } else {
741                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
742                self.identifier.as_deref()
743            }
744        } else {
745            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
746            self.identifier.as_deref()
747        }
748    }
749}
750
751impl expr::UserDefinedFunctionMetadata {
752    pub fn name_in_runtime(&self) -> Option<&str> {
753        if self.version() < expr::UdfExprVersion::NameInRuntime {
754            if self.language == "rust" || self.language == "wasm" {
755                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
756                // is not used any more. And unfortunately, we don't have the original name
757                // in `PbUserDefinedFunctionMetadata`, so we need to extract the name from
758                // the old `identifier` value (e.g. `foo()->int32`).
759                let old_identifier = self
760                    .identifier
761                    .as_ref()
762                    .expect("Rust/WASM UDF must have identifier");
763                Some(
764                    old_identifier
765                        .split_once("(")
766                        .expect("the old identifier must contain `(`")
767                        .0,
768                )
769            } else {
770                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
771                self.identifier.as_deref()
772            }
773        } else {
774            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
775            self.identifier.as_deref()
776        }
777    }
778}
779
780impl streaming_job_resource_type::ResourceType {
781    pub fn resource_group(&self) -> Option<String> {
782        match self {
783            streaming_job_resource_type::ResourceType::Regular(_) => None,
784            streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
785            | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
786                Some(group.clone())
787            }
788        }
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use crate::data::{DataType, data_type};
795    use crate::plan_common::Field;
796    use crate::stream_plan::stream_node::NodeBody;
797
798    #[test]
799    fn test_getter() {
800        let data_type: DataType = DataType {
801            is_nullable: true,
802            ..Default::default()
803        };
804        let field = Field {
805            data_type: Some(data_type),
806            name: "".to_owned(),
807        };
808        assert!(field.get_data_type().unwrap().is_nullable);
809    }
810
811    #[test]
812    fn test_enum_getter() {
813        let mut data_type: DataType = DataType::default();
814        data_type.type_name = data_type::TypeName::Double as i32;
815        assert_eq!(
816            data_type::TypeName::Double,
817            data_type.get_type_name().unwrap()
818        );
819    }
820
821    #[test]
822    fn test_enum_unspecified() {
823        let mut data_type: DataType = DataType::default();
824        data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
825        assert!(data_type.get_type_name().is_err());
826    }
827
828    #[test]
829    fn test_primitive_getter() {
830        let data_type: DataType = DataType::default();
831        let new_data_type = DataType {
832            is_nullable: data_type.get_is_nullable(),
833            ..Default::default()
834        };
835        assert!(!new_data_type.is_nullable);
836    }
837
838    #[test]
839    fn test_size() {
840        use static_assertions::const_assert_eq;
841        // box all fields in NodeBody to avoid large_enum_variant
842        // see https://github.com/risingwavelabs/risingwave/issues/19910
843        const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
844    }
845
846    #[test]
847    #[expect(deprecated)]
848    fn test_get_clean_watermark_index_in_pk_compat() {
849        use crate::catalog::Table;
850        use crate::common::{ColumnOrder, OrderType};
851
852        fn create_column_order(column_index: u32) -> ColumnOrder {
853            ColumnOrder {
854                column_index,
855                order_type: Some(OrderType::default()),
856            }
857        }
858
859        // Test case 1: both fields are set
860        let table = Table {
861            clean_watermark_indices: vec![3, 2],
862            clean_watermark_index_in_pk: Some(0),
863            pk: vec![
864                create_column_order(1),
865                create_column_order(2),
866                create_column_order(3),
867                create_column_order(4),
868            ],
869            ..Default::default()
870        };
871        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
872
873        // Test case 2: only old field is set
874        let table = Table {
875            clean_watermark_indices: vec![],
876            clean_watermark_index_in_pk: Some(1),
877            pk: vec![create_column_order(0), create_column_order(2)],
878            ..Default::default()
879        };
880        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
881
882        // Test case 3: no clean watermark configured
883        let table = Table {
884            clean_watermark_indices: vec![],
885            clean_watermark_index_in_pk: None,
886            ..Default::default()
887        };
888        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
889    }
890}