risingwave_pb/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![allow(clippy::doc_lazy_continuation)]
18// for derived code of `Message`
19#![expect(clippy::doc_markdown)]
20#![expect(clippy::upper_case_acronyms)]
21#![expect(clippy::needless_lifetimes)]
22// For tonic::transport::Endpoint::connect
23#![expect(clippy::disallowed_methods)]
24#![expect(clippy::enum_variant_names)]
25#![expect(clippy::module_inception)]
26// FIXME: This should be fixed!!! https://github.com/risingwavelabs/risingwave/issues/19906
27#![expect(clippy::large_enum_variant)]
28#![feature(step_trait)]
29
30pub mod id;
31
32use std::str::FromStr;
33
34use event_recovery::RecoveryEvent;
35use plan_common::AdditionalColumn;
36pub use prost::Message;
37use risingwave_error::tonic::ToTonicStatus;
38use thiserror::Error;
39
40use crate::common::WorkerType;
41use crate::ddl_service::streaming_job_resource_type;
42use crate::id::{FragmentId, SourceId, WorkerId};
43use crate::meta::event_log::event_recovery;
44use crate::stream_plan::PbStreamScanType;
45
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/catalog.rs")]
48pub mod catalog;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/common.rs")]
51pub mod common;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/compute.rs")]
54pub mod compute;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
57pub mod cloud_service;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/data.rs")]
60pub mod data;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
63pub mod ddl_service;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/expr.rs")]
66pub mod expr;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/meta.rs")]
69pub mod meta;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/plan_common.rs")]
72pub mod plan_common;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
75pub mod batch_plan;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/task_service.rs")]
78pub mod task_service;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/connector_service.rs")]
81pub mod connector_service;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
84pub mod stream_plan;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/stream_service.rs")]
87pub mod stream_service;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/hummock.rs")]
90pub mod hummock;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/compactor.rs")]
93pub mod compactor;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/user.rs")]
96pub mod user;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/source.rs")]
99pub mod source;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
102pub mod monitor_service;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/backup_service.rs")]
105pub mod backup_service;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
108pub mod serverless_backfill_controller;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
111pub mod frontend_service;
112#[rustfmt::skip]
113#[cfg_attr(madsim, path = "sim/java_binding.rs")]
114pub mod java_binding;
115#[rustfmt::skip]
116#[cfg_attr(madsim, path = "sim/health.rs")]
117pub mod health;
118#[rustfmt::skip]
119#[path = "sim/telemetry.rs"]
120pub mod telemetry;
121#[rustfmt::skip]
122#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
123pub mod iceberg_compaction;
124#[rustfmt::skip]
125pub mod window_function;
126
127#[rustfmt::skip]
128#[path = "sim/secret.rs"]
129pub mod secret;
130#[rustfmt::skip]
131#[path = "connector_service.serde.rs"]
132pub mod connector_service_serde;
133#[rustfmt::skip]
134#[path = "catalog.serde.rs"]
135pub mod catalog_serde;
136#[rustfmt::skip]
137#[path = "common.serde.rs"]
138pub mod common_serde;
139#[rustfmt::skip]
140#[path = "compute.serde.rs"]
141pub mod compute_serde;
142#[rustfmt::skip]
143#[path = "cloud_service.serde.rs"]
144pub mod cloud_service_serde;
145#[rustfmt::skip]
146#[path = "data.serde.rs"]
147pub mod data_serde;
148#[rustfmt::skip]
149#[path = "ddl_service.serde.rs"]
150pub mod ddl_service_serde;
151#[rustfmt::skip]
152#[path = "expr.serde.rs"]
153pub mod expr_serde;
154#[rustfmt::skip]
155#[path = "meta.serde.rs"]
156pub mod meta_serde;
157#[rustfmt::skip]
158#[path = "plan_common.serde.rs"]
159pub mod plan_common_serde;
160#[rustfmt::skip]
161#[path = "batch_plan.serde.rs"]
162pub mod batch_plan_serde;
163#[rustfmt::skip]
164#[path = "task_service.serde.rs"]
165pub mod task_service_serde;
166#[rustfmt::skip]
167#[path = "stream_plan.serde.rs"]
168pub mod stream_plan_serde;
169#[rustfmt::skip]
170#[path = "stream_service.serde.rs"]
171pub mod stream_service_serde;
172#[rustfmt::skip]
173#[path = "hummock.serde.rs"]
174pub mod hummock_serde;
175#[rustfmt::skip]
176#[path = "compactor.serde.rs"]
177pub mod compactor_serde;
178#[rustfmt::skip]
179#[path = "user.serde.rs"]
180pub mod user_serde;
181#[rustfmt::skip]
182#[path = "source.serde.rs"]
183pub mod source_serde;
184#[rustfmt::skip]
185#[path = "monitor_service.serde.rs"]
186pub mod monitor_service_serde;
187#[rustfmt::skip]
188#[path = "backup_service.serde.rs"]
189pub mod backup_service_serde;
190#[rustfmt::skip]
191#[path = "java_binding.serde.rs"]
192pub mod java_binding_serde;
193#[rustfmt::skip]
194#[path = "telemetry.serde.rs"]
195pub mod telemetry_serde;
196
197#[rustfmt::skip]
198#[path = "secret.serde.rs"]
199pub mod secret_serde;
200#[rustfmt::skip]
201#[path = "serverless_backfill_controller.serde.rs"]
202pub mod serverless_backfill_controller_serde;
203
204pub const MONITOR_SERVICE_MESSAGE_SIZE_LIMIT: usize = 64 * 1024 * 1024;
205
206#[cfg(not(madsim))]
207pub fn configured_monitor_service_client<T>(
208    client: monitor_service::monitor_service_client::MonitorServiceClient<T>,
209) -> monitor_service::monitor_service_client::MonitorServiceClient<T>
210where
211    T: tonic::client::GrpcService<tonic::body::Body>,
212    T::Error: Into<tonic::codegen::StdError>,
213    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
214    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
215{
216    client
217        .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
218        .max_decoding_message_size(MONITOR_SERVICE_MESSAGE_SIZE_LIMIT)
219}
220
221#[cfg(madsim)]
222pub fn configured_monitor_service_client<F>(
223    client: monitor_service::monitor_service_client::MonitorServiceClient<
224        tonic::transport::Channel,
225        F,
226    >,
227) -> monitor_service::monitor_service_client::MonitorServiceClient<tonic::transport::Channel, F>
228where
229    F: tonic::service::Interceptor,
230{
231    client
232        .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
233        .max_decoding_message_size(MONITOR_SERVICE_MESSAGE_SIZE_LIMIT)
234}
235
236#[cfg(not(madsim))]
237pub fn configured_monitor_service_server<T>(
238    server: monitor_service::monitor_service_server::MonitorServiceServer<T>,
239) -> monitor_service::monitor_service_server::MonitorServiceServer<T> {
240    server.send_compressed(tonic::codec::CompressionEncoding::Zstd)
241}
242
243#[cfg(madsim)]
244pub fn configured_monitor_service_server<T, F>(
245    server: monitor_service::monitor_service_server::MonitorServiceServer<T, F>,
246) -> monitor_service::monitor_service_server::MonitorServiceServer<T, F>
247where
248    T: monitor_service::monitor_service_server::MonitorService,
249    F: tonic::service::Interceptor,
250{
251    server.send_compressed(tonic::codec::CompressionEncoding::Zstd)
252}
253
254#[derive(Clone, PartialEq, Eq, Debug, Error)]
255#[error("field `{0}` not found")]
256pub struct PbFieldNotFound(pub &'static str);
257
258impl From<PbFieldNotFound> for tonic::Status {
259    fn from(e: PbFieldNotFound) -> Self {
260        e.to_status_unnamed(tonic::Code::Internal)
261    }
262}
263
264impl FromStr for crate::expr::table_function::PbType {
265    type Err = ();
266
267    fn from_str(s: &str) -> Result<Self, Self::Err> {
268        Self::from_str_name(&s.to_uppercase()).ok_or(())
269    }
270}
271
272impl FromStr for crate::expr::agg_call::PbKind {
273    type Err = ();
274
275    fn from_str(s: &str) -> Result<Self, Self::Err> {
276        Self::from_str_name(&s.to_uppercase()).ok_or(())
277    }
278}
279
280impl stream_plan::MaterializeNode {
281    pub fn dist_key_indices(&self) -> Vec<u32> {
282        self.get_table()
283            .unwrap()
284            .distribution_key
285            .iter()
286            .map(|i| *i as u32)
287            .collect()
288    }
289
290    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
291        self.get_table()
292            .unwrap()
293            .columns
294            .iter()
295            .map(|c| c.get_column_desc().unwrap().clone())
296            .collect()
297    }
298}
299
300impl stream_plan::StreamScanNode {
301    /// See [`Self::upstream_column_ids`].
302    pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
303        self.upstream_column_ids
304            .iter()
305            .map(|id| {
306                (self.table_desc.as_ref().unwrap().columns.iter())
307                    .find(|c| c.column_id == *id)
308                    .unwrap()
309                    .clone()
310            })
311            .collect()
312    }
313}
314
315impl stream_plan::SourceBackfillNode {
316    pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
317        self.columns
318            .iter()
319            .map(|c| c.column_desc.as_ref().unwrap().clone())
320            .collect()
321    }
322}
323
324// Encapsulating the use of parallelism.
325impl common::WorkerNode {
326    pub fn compute_node_parallelism(&self) -> usize {
327        assert_eq!(self.r#type(), WorkerType::ComputeNode);
328        self.property
329            .as_ref()
330            .expect("property should be exist")
331            .parallelism as usize
332    }
333
334    fn compactor_node_parallelism(&self) -> usize {
335        assert_eq!(self.r#type(), WorkerType::Compactor);
336        self.property
337            .as_ref()
338            .expect("property should be exist")
339            .parallelism as usize
340    }
341
342    pub fn parallelism(&self) -> Option<usize> {
343        match self.r#type() {
344            WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
345            WorkerType::Compactor => Some(self.compactor_node_parallelism()),
346            _ => None,
347        }
348    }
349
350    pub fn resource_group(&self) -> Option<String> {
351        self.property
352            .as_ref()
353            .and_then(|p| p.resource_group.clone())
354    }
355}
356
357impl stream_plan::SourceNode {
358    pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
359        Some(
360            self.source_inner
361                .as_ref()?
362                .columns
363                .iter()
364                .map(|c| c.get_column_desc().unwrap().clone())
365                .collect(),
366        )
367    }
368}
369
370impl meta::table_fragments::ActorStatus {
371    pub fn worker_id(&self) -> WorkerId {
372        self.location
373            .as_ref()
374            .expect("actor location should be exist")
375            .worker_node_id
376    }
377}
378
379impl common::ActorLocation {
380    pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
381        Some(Self { worker_node_id })
382    }
383}
384
385impl meta::event_log::EventRecovery {
386    pub fn event_type(&self) -> &str {
387        match self.recovery_event.as_ref() {
388            Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
389            Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
390            Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
391            Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
392            Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
393            Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
394            None => "UNKNOWN_RECOVERY_EVENT",
395        }
396    }
397
398    pub fn database_recovery_start(database_id: u32) -> Self {
399        Self {
400            recovery_event: Some(RecoveryEvent::DatabaseStart(
401                event_recovery::DatabaseRecoveryStart { database_id },
402            )),
403        }
404    }
405
406    pub fn database_recovery_failure(database_id: u32) -> Self {
407        Self {
408            recovery_event: Some(RecoveryEvent::DatabaseFailure(
409                event_recovery::DatabaseRecoveryFailure { database_id },
410            )),
411        }
412    }
413
414    pub fn database_recovery_success(database_id: u32) -> Self {
415        Self {
416            recovery_event: Some(RecoveryEvent::DatabaseSuccess(
417                event_recovery::DatabaseRecoverySuccess { database_id },
418            )),
419        }
420    }
421
422    pub fn global_recovery_start(reason: String) -> Self {
423        Self {
424            recovery_event: Some(RecoveryEvent::GlobalStart(
425                event_recovery::GlobalRecoveryStart { reason },
426            )),
427        }
428    }
429
430    pub fn global_recovery_success(
431        reason: String,
432        duration_secs: f32,
433        running_database_ids: Vec<u32>,
434        recovering_database_ids: Vec<u32>,
435    ) -> Self {
436        Self {
437            recovery_event: Some(RecoveryEvent::GlobalSuccess(
438                event_recovery::GlobalRecoverySuccess {
439                    reason,
440                    duration_secs,
441                    running_database_ids,
442                    recovering_database_ids,
443                },
444            )),
445        }
446    }
447
448    pub fn global_recovery_failure(reason: String, error: String) -> Self {
449        Self {
450            recovery_event: Some(RecoveryEvent::GlobalFailure(
451                event_recovery::GlobalRecoveryFailure { reason, error },
452            )),
453        }
454    }
455}
456
457impl stream_plan::StreamNode {
458    /// Find the external stream source info inside the stream node, if any.
459    ///
460    /// Returns `source_id`.
461    pub fn find_stream_source(&self) -> Option<SourceId> {
462        if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
463            self.node_body.as_ref()
464            && let Some(inner) = &source.source_inner
465        {
466            return Some(inner.source_id);
467        }
468
469        for child in &self.input {
470            if let Some(source) = child.find_stream_source() {
471                return Some(source);
472            }
473        }
474
475        None
476    }
477
478    /// Find the external stream source info inside the stream node, if any.
479    ///
480    /// Returns (`source_id`, `upstream_source_fragment_id`).
481    ///
482    /// Note: we must get upstream fragment id from the merge node, not from the fragment's
483    /// `upstream_fragment_ids`. e.g., DynamicFilter may have 2 upstream fragments, but only
484    /// one is the upstream source fragment.
485    pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
486        if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
487            self.node_body.as_ref()
488        {
489            if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
490                self.input[0].node_body.as_ref().unwrap()
491            {
492                // Note: avoid using `merge.upstream_actor_id` to prevent misuse.
493                // See comments there for details.
494                return Some((source.upstream_source_id, merge.upstream_fragment_id));
495            } else {
496                unreachable!(
497                    "source backfill must have a merge node as its input: {:?}",
498                    self
499                );
500            }
501        }
502
503        for child in &self.input {
504            if let Some(source) = child.find_source_backfill() {
505                return Some(source);
506            }
507        }
508
509        None
510    }
511}
512impl stream_plan::Dispatcher {
513    pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
514        stream_plan::DispatchStrategy {
515            r#type: self.r#type,
516            dist_key_indices: self.dist_key_indices.clone(),
517            output_mapping: self.output_mapping.clone(),
518        }
519    }
520}
521
522impl stream_plan::DispatchOutputMapping {
523    /// Create a mapping that forwards all columns.
524    pub fn identical(len: usize) -> Self {
525        Self {
526            indices: (0..len as u32).collect(),
527            types: Vec::new(),
528        }
529    }
530
531    /// Create a mapping that forwards columns with given indices, without type conversion.
532    pub fn simple(indices: Vec<u32>) -> Self {
533        Self {
534            indices,
535            types: Vec::new(),
536        }
537    }
538
539    /// Assert that this mapping does not involve type conversion and return the indices.
540    pub fn into_simple_indices(self) -> Vec<u32> {
541        assert!(
542            self.types.is_empty(),
543            "types must be empty for simple mapping"
544        );
545        self.indices
546    }
547}
548
549impl catalog::StreamSourceInfo {
550    /// Refer to [`Self::cdc_source_job`] for details.
551    pub fn is_shared(&self) -> bool {
552        self.cdc_source_job
553    }
554}
555
556impl stream_plan::PbStreamScanType {
557    pub fn is_reschedulable(&self, is_online: bool) -> bool {
558        match self {
559            PbStreamScanType::Unspecified => {
560                unreachable!()
561            }
562            // todo: should this be true?
563            PbStreamScanType::UpstreamOnly => false,
564            PbStreamScanType::ArrangementBackfill => true,
565            PbStreamScanType::CrossDbSnapshotBackfill => true,
566            PbStreamScanType::SnapshotBackfill => !is_online,
567            PbStreamScanType::Chain | PbStreamScanType::Rearrange | PbStreamScanType::Backfill => {
568                false
569            }
570        }
571    }
572}
573
574impl catalog::Sink {
575    // TODO: remove this placeholder
576    // creating table sink does not have an id, so we need a placeholder
577    pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
578
579    pub fn unique_identity(&self) -> String {
580        // TODO: use a more unique name
581        format!("{}", self.id)
582    }
583
584    /// Get `ignore_delete` with backward compatibility.
585    ///
586    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
587    #[allow(deprecated)]
588    pub fn ignore_delete(&self) -> bool {
589        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
590    }
591}
592
593impl stream_plan::SinkDesc {
594    /// Get `ignore_delete` with backward compatibility.
595    ///
596    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
597    #[allow(deprecated)]
598    pub fn ignore_delete(&self) -> bool {
599        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
600    }
601}
602
603impl connector_service::SinkParam {
604    /// Get `ignore_delete` with backward compatibility.
605    ///
606    /// Historically we use `sink_type == ForceAppendOnly` to represent this behavior.
607    #[allow(deprecated)]
608    pub fn ignore_delete(&self) -> bool {
609        self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
610    }
611}
612
613impl catalog::Table {
614    /// Get clean watermark column indices with backward compatibility.
615    ///
616    /// Returns the new `clean_watermark_indices` if set, otherwise derives it from the old
617    /// `clean_watermark_index_in_pk` by converting PK index to column index.
618    ///
619    /// Note: a non-empty slice does not imply that the executor **SHOULD** clean this table
620    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
621    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
622    /// Besides, due to historical reasons, this method may return `[pk[0]]` even if the table
623    /// has nothing to do with watermark.
624    #[expect(deprecated)]
625    pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
626        if !self.clean_watermark_indices.is_empty() {
627            // New format: directly return clean_watermark_indices
628            self.clean_watermark_indices.clone()
629        } else if let Some(pk_idx) = self
630            .clean_watermark_index_in_pk
631            // At the very beginning, the watermark index was hard-coded to the first column of the pk.
632            .or_else(|| (!self.pk.is_empty()).then_some(0))
633        {
634            // Old format: convert PK index to column index
635            // The pk_idx is the position in the PK, we need to find the corresponding column index
636            if let Some(col_order) = self.pk.get(pk_idx as usize) {
637                vec![col_order.column_index]
638            } else {
639                if cfg!(debug_assertions) {
640                    panic!("clean_watermark_index_in_pk is out of range: {self:?}");
641                }
642                vec![]
643            }
644        } else {
645            vec![]
646        }
647    }
648
649    /// Convert clean watermark column indices to PK indices and return the minimum.
650    /// Returns None if no clean watermark is configured.
651    ///
652    /// This is a backward-compatible method to replace the deprecated `clean_watermark_index_in_pk` field.
653    ///
654    /// Note: a `Some` return value does not imply that the executor **SHOULD** clean this table
655    /// by watermark, but that the storage **CAN** clean this table by watermark. It's actually
656    /// the executor's responsibility to decide whether state cleaning is correct on semantics.
657    /// Besides, due to historical reasons, this method may return `Some(pk[0])` even if the table
658    /// has nothing to do with watermark.
659    ///
660    /// TODO: remove this method after totally deprecating `clean_watermark_index_in_pk`.
661    pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
662        let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
663
664        // Convert column indices to PK indices
665        let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
666            .iter()
667            .filter_map(|&col_idx| {
668                self.pk
669                    .iter()
670                    .position(|col_order| col_order.column_index == col_idx)
671            })
672            .collect();
673
674        // Return the minimum PK index
675        clean_watermark_indices_in_pk.iter().min().copied()
676    }
677}
678
679impl std::fmt::Debug for meta::SystemParams {
680    /// Directly formatting `SystemParams` can be inaccurate or leak sensitive information.
681    ///
682    /// Use `SystemParamsReader` instead.
683    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684        f.debug_struct("SystemParams").finish_non_exhaustive()
685    }
686}
687
688// More compact formats for debugging
689
690impl std::fmt::Debug for data::DataType {
691    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
692        let data::DataType {
693            precision,
694            scale,
695            interval_type,
696            field_type,
697            field_names,
698            field_ids,
699            type_name,
700            // currently all data types are nullable
701            is_nullable: _,
702        } = self;
703
704        let type_name = data::data_type::TypeName::try_from(*type_name)
705            .map(|t| t.as_str_name())
706            .unwrap_or("Unknown");
707
708        let mut s = f.debug_struct(type_name);
709        if self.precision != 0 {
710            s.field("precision", precision);
711        }
712        if self.scale != 0 {
713            s.field("scale", scale);
714        }
715        if self.interval_type != 0 {
716            s.field("interval_type", interval_type);
717        }
718        if !self.field_type.is_empty() {
719            s.field("field_type", field_type);
720        }
721        if !self.field_names.is_empty() {
722            s.field("field_names", field_names);
723        }
724        if !self.field_ids.is_empty() {
725            s.field("field_ids", field_ids);
726        }
727        s.finish()
728    }
729}
730
731impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
732    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733        match self {
734            Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
735            Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
736        }
737    }
738}
739
740impl std::fmt::Debug for plan_common::ColumnDesc {
741    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
742        // destruct here to avoid missing new fields in the future.
743        let plan_common::ColumnDesc {
744            column_type,
745            column_id,
746            name,
747            description,
748            additional_column_type,
749            additional_column,
750            generated_or_default_column,
751            version,
752            nullable,
753        } = self;
754
755        let mut s = f.debug_struct("ColumnDesc");
756        if let Some(column_type) = column_type {
757            s.field("column_type", column_type);
758        } else {
759            s.field("column_type", &"Unknown");
760        }
761        s.field("column_id", column_id).field("name", name);
762        if let Some(description) = description {
763            s.field("description", description);
764        }
765        if self.additional_column_type != 0 {
766            s.field("additional_column_type", additional_column_type);
767        }
768        s.field("version", version);
769        if let Some(AdditionalColumn {
770            column_type: Some(column_type),
771        }) = additional_column
772        {
773            // AdditionalColumn { None } means a normal column
774            s.field("additional_column", &column_type);
775        }
776        if let Some(generated_or_default_column) = generated_or_default_column {
777            s.field("generated_or_default_column", &generated_or_default_column);
778        }
779        s.field("nullable", nullable);
780        s.finish()
781    }
782}
783
784impl expr::UserDefinedFunction {
785    pub fn name_in_runtime(&self) -> Option<&str> {
786        if self.version() < expr::UdfExprVersion::NameInRuntime {
787            if self.language == "rust" || self.language == "wasm" {
788                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
789                // is not used any more. The real bound function name should be the same
790                // as `name`.
791                Some(&self.name)
792            } else {
793                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
794                self.identifier.as_deref()
795            }
796        } else {
797            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
798            self.identifier.as_deref()
799        }
800    }
801}
802
803impl expr::UserDefinedFunctionMetadata {
804    pub fn name_in_runtime(&self) -> Option<&str> {
805        if self.version() < expr::UdfExprVersion::NameInRuntime {
806            if self.language == "rust" || self.language == "wasm" {
807                // The `identifier` value of Rust and WASM UDF before `NameInRuntime`
808                // is not used any more. And unfortunately, we don't have the original name
809                // in `PbUserDefinedFunctionMetadata`, so we need to extract the name from
810                // the old `identifier` value (e.g. `foo()->int32`).
811                let old_identifier = self
812                    .identifier
813                    .as_ref()
814                    .expect("Rust/WASM UDF must have identifier");
815                Some(
816                    old_identifier
817                        .split_once("(")
818                        .expect("the old identifier must contain `(`")
819                        .0,
820                )
821            } else {
822                // `identifier`s of other UDFs already mean `name_in_runtime` before `NameInRuntime`.
823                self.identifier.as_deref()
824            }
825        } else {
826            // after `PbUdfExprVersion::NameInRuntime`, `identifier` means `name_in_runtime`
827            self.identifier.as_deref()
828        }
829    }
830}
831
832impl streaming_job_resource_type::ResourceType {
833    pub fn resource_group(&self) -> Option<String> {
834        match self {
835            streaming_job_resource_type::ResourceType::Regular(_) => None,
836            streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
837            | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
838                Some(group.clone())
839            }
840        }
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use crate::data::{DataType, data_type};
847    use crate::plan_common::Field;
848    use crate::stream_plan::stream_node::NodeBody;
849
850    #[test]
851    fn test_getter() {
852        let data_type: DataType = DataType {
853            is_nullable: true,
854            ..Default::default()
855        };
856        let field = Field {
857            data_type: Some(data_type),
858            name: "".to_owned(),
859        };
860        assert!(field.get_data_type().unwrap().is_nullable);
861    }
862
863    #[test]
864    fn test_enum_getter() {
865        let mut data_type: DataType = DataType::default();
866        data_type.type_name = data_type::TypeName::Double as i32;
867        assert_eq!(
868            data_type::TypeName::Double,
869            data_type.get_type_name().unwrap()
870        );
871    }
872
873    #[test]
874    fn test_enum_unspecified() {
875        let mut data_type: DataType = DataType::default();
876        data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
877        assert!(data_type.get_type_name().is_err());
878    }
879
880    #[test]
881    fn test_primitive_getter() {
882        let data_type: DataType = DataType::default();
883        let new_data_type = DataType {
884            is_nullable: data_type.get_is_nullable(),
885            ..Default::default()
886        };
887        assert!(!new_data_type.is_nullable);
888    }
889
890    #[test]
891    fn test_size() {
892        use static_assertions::const_assert_eq;
893        // box all fields in NodeBody to avoid large_enum_variant
894        // see https://github.com/risingwavelabs/risingwave/issues/19910
895        const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
896    }
897
898    #[test]
899    #[expect(deprecated)]
900    fn test_get_clean_watermark_index_in_pk_compat() {
901        use crate::catalog::Table;
902        use crate::common::{ColumnOrder, OrderType};
903
904        fn create_column_order(column_index: u32) -> ColumnOrder {
905            ColumnOrder {
906                column_index,
907                order_type: Some(OrderType::default()),
908            }
909        }
910
911        // Test case 1: both fields are set
912        let table = Table {
913            clean_watermark_indices: vec![3, 2],
914            clean_watermark_index_in_pk: Some(0),
915            pk: vec![
916                create_column_order(1),
917                create_column_order(2),
918                create_column_order(3),
919                create_column_order(4),
920            ],
921            ..Default::default()
922        };
923        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
924
925        // Test case 2: only old field is set
926        let table = Table {
927            clean_watermark_indices: vec![],
928            clean_watermark_index_in_pk: Some(1),
929            pk: vec![create_column_order(0), create_column_order(2)],
930            ..Default::default()
931        };
932        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
933
934        // Test case 3: no clean watermark configured
935        let table = Table {
936            clean_watermark_indices: vec![],
937            clean_watermark_index_in_pk: None,
938            ..Default::default()
939        };
940        assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
941    }
942}