1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![allow(clippy::doc_lazy_continuation)]
18#![expect(clippy::doc_markdown)]
20#![expect(clippy::upper_case_acronyms)]
21#![expect(clippy::needless_lifetimes)]
22#![expect(clippy::disallowed_methods)]
24#![expect(clippy::enum_variant_names)]
25#![expect(clippy::module_inception)]
26#![expect(clippy::large_enum_variant)]
28#![feature(step_trait)]
29
30pub mod id;
31
32use std::str::FromStr;
33
34use event_recovery::RecoveryEvent;
35use plan_common::AdditionalColumn;
36pub use prost::Message;
37use risingwave_error::tonic::ToTonicStatus;
38use thiserror::Error;
39
40use crate::common::WorkerType;
41use crate::ddl_service::streaming_job_resource_type;
42use crate::id::{FragmentId, SourceId, WorkerId};
43use crate::meta::event_log::event_recovery;
44use crate::stream_plan::PbStreamScanType;
45
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/catalog.rs")]
48pub mod catalog;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/common.rs")]
51pub mod common;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/compute.rs")]
54pub mod compute;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
57pub mod cloud_service;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/data.rs")]
60pub mod data;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
63pub mod ddl_service;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/expr.rs")]
66pub mod expr;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/meta.rs")]
69pub mod meta;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/plan_common.rs")]
72pub mod plan_common;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
75pub mod batch_plan;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/task_service.rs")]
78pub mod task_service;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/connector_service.rs")]
81pub mod connector_service;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
84pub mod stream_plan;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/stream_service.rs")]
87pub mod stream_service;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/hummock.rs")]
90pub mod hummock;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/compactor.rs")]
93pub mod compactor;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/user.rs")]
96pub mod user;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/source.rs")]
99pub mod source;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
102pub mod monitor_service;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/backup_service.rs")]
105pub mod backup_service;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
108pub mod serverless_backfill_controller;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
111pub mod frontend_service;
112#[rustfmt::skip]
113#[cfg_attr(madsim, path = "sim/java_binding.rs")]
114pub mod java_binding;
115#[rustfmt::skip]
116#[cfg_attr(madsim, path = "sim/health.rs")]
117pub mod health;
118#[rustfmt::skip]
119#[path = "sim/telemetry.rs"]
120pub mod telemetry;
121#[rustfmt::skip]
122#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
123pub mod iceberg_compaction;
124#[rustfmt::skip]
125pub mod window_function;
126
127#[rustfmt::skip]
128#[path = "sim/secret.rs"]
129pub mod secret;
130#[rustfmt::skip]
131#[path = "connector_service.serde.rs"]
132pub mod connector_service_serde;
133#[rustfmt::skip]
134#[path = "catalog.serde.rs"]
135pub mod catalog_serde;
136#[rustfmt::skip]
137#[path = "common.serde.rs"]
138pub mod common_serde;
139#[rustfmt::skip]
140#[path = "compute.serde.rs"]
141pub mod compute_serde;
142#[rustfmt::skip]
143#[path = "cloud_service.serde.rs"]
144pub mod cloud_service_serde;
145#[rustfmt::skip]
146#[path = "data.serde.rs"]
147pub mod data_serde;
148#[rustfmt::skip]
149#[path = "ddl_service.serde.rs"]
150pub mod ddl_service_serde;
151#[rustfmt::skip]
152#[path = "expr.serde.rs"]
153pub mod expr_serde;
154#[rustfmt::skip]
155#[path = "meta.serde.rs"]
156pub mod meta_serde;
157#[rustfmt::skip]
158#[path = "plan_common.serde.rs"]
159pub mod plan_common_serde;
160#[rustfmt::skip]
161#[path = "batch_plan.serde.rs"]
162pub mod batch_plan_serde;
163#[rustfmt::skip]
164#[path = "task_service.serde.rs"]
165pub mod task_service_serde;
166#[rustfmt::skip]
167#[path = "stream_plan.serde.rs"]
168pub mod stream_plan_serde;
169#[rustfmt::skip]
170#[path = "stream_service.serde.rs"]
171pub mod stream_service_serde;
172#[rustfmt::skip]
173#[path = "hummock.serde.rs"]
174pub mod hummock_serde;
175#[rustfmt::skip]
176#[path = "compactor.serde.rs"]
177pub mod compactor_serde;
178#[rustfmt::skip]
179#[path = "user.serde.rs"]
180pub mod user_serde;
181#[rustfmt::skip]
182#[path = "source.serde.rs"]
183pub mod source_serde;
184#[rustfmt::skip]
185#[path = "monitor_service.serde.rs"]
186pub mod monitor_service_serde;
187#[rustfmt::skip]
188#[path = "backup_service.serde.rs"]
189pub mod backup_service_serde;
190#[rustfmt::skip]
191#[path = "java_binding.serde.rs"]
192pub mod java_binding_serde;
193#[rustfmt::skip]
194#[path = "telemetry.serde.rs"]
195pub mod telemetry_serde;
196
197#[rustfmt::skip]
198#[path = "secret.serde.rs"]
199pub mod secret_serde;
200#[rustfmt::skip]
201#[path = "serverless_backfill_controller.serde.rs"]
202pub mod serverless_backfill_controller_serde;
203
204pub const MONITOR_SERVICE_MESSAGE_SIZE_LIMIT: usize = 64 * 1024 * 1024;
205
206#[cfg(not(madsim))]
207pub fn configured_monitor_service_client<T>(
208 client: monitor_service::monitor_service_client::MonitorServiceClient<T>,
209) -> monitor_service::monitor_service_client::MonitorServiceClient<T>
210where
211 T: tonic::client::GrpcService<tonic::body::Body>,
212 T::Error: Into<tonic::codegen::StdError>,
213 T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
214 <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
215{
216 client
217 .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
218 .max_decoding_message_size(MONITOR_SERVICE_MESSAGE_SIZE_LIMIT)
219}
220
221#[cfg(madsim)]
222pub fn configured_monitor_service_client<F>(
223 client: monitor_service::monitor_service_client::MonitorServiceClient<
224 tonic::transport::Channel,
225 F,
226 >,
227) -> monitor_service::monitor_service_client::MonitorServiceClient<tonic::transport::Channel, F>
228where
229 F: tonic::service::Interceptor,
230{
231 client
232 .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
233 .max_decoding_message_size(MONITOR_SERVICE_MESSAGE_SIZE_LIMIT)
234}
235
236#[cfg(not(madsim))]
237pub fn configured_monitor_service_server<T>(
238 server: monitor_service::monitor_service_server::MonitorServiceServer<T>,
239) -> monitor_service::monitor_service_server::MonitorServiceServer<T> {
240 server.send_compressed(tonic::codec::CompressionEncoding::Zstd)
241}
242
243#[cfg(madsim)]
244pub fn configured_monitor_service_server<T, F>(
245 server: monitor_service::monitor_service_server::MonitorServiceServer<T, F>,
246) -> monitor_service::monitor_service_server::MonitorServiceServer<T, F>
247where
248 T: monitor_service::monitor_service_server::MonitorService,
249 F: tonic::service::Interceptor,
250{
251 server.send_compressed(tonic::codec::CompressionEncoding::Zstd)
252}
253
254#[derive(Clone, PartialEq, Eq, Debug, Error)]
255#[error("field `{0}` not found")]
256pub struct PbFieldNotFound(pub &'static str);
257
258impl From<PbFieldNotFound> for tonic::Status {
259 fn from(e: PbFieldNotFound) -> Self {
260 e.to_status_unnamed(tonic::Code::Internal)
261 }
262}
263
264impl FromStr for crate::expr::table_function::PbType {
265 type Err = ();
266
267 fn from_str(s: &str) -> Result<Self, Self::Err> {
268 Self::from_str_name(&s.to_uppercase()).ok_or(())
269 }
270}
271
272impl FromStr for crate::expr::agg_call::PbKind {
273 type Err = ();
274
275 fn from_str(s: &str) -> Result<Self, Self::Err> {
276 Self::from_str_name(&s.to_uppercase()).ok_or(())
277 }
278}
279
280impl stream_plan::MaterializeNode {
281 pub fn dist_key_indices(&self) -> Vec<u32> {
282 self.get_table()
283 .unwrap()
284 .distribution_key
285 .iter()
286 .map(|i| *i as u32)
287 .collect()
288 }
289
290 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
291 self.get_table()
292 .unwrap()
293 .columns
294 .iter()
295 .map(|c| c.get_column_desc().unwrap().clone())
296 .collect()
297 }
298}
299
300impl stream_plan::StreamScanNode {
301 pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
303 self.upstream_column_ids
304 .iter()
305 .map(|id| {
306 (self.table_desc.as_ref().unwrap().columns.iter())
307 .find(|c| c.column_id == *id)
308 .unwrap()
309 .clone()
310 })
311 .collect()
312 }
313}
314
315impl stream_plan::SourceBackfillNode {
316 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
317 self.columns
318 .iter()
319 .map(|c| c.column_desc.as_ref().unwrap().clone())
320 .collect()
321 }
322}
323
324impl common::WorkerNode {
326 pub fn compute_node_parallelism(&self) -> usize {
327 assert_eq!(self.r#type(), WorkerType::ComputeNode);
328 self.property
329 .as_ref()
330 .expect("property should be exist")
331 .parallelism as usize
332 }
333
334 fn compactor_node_parallelism(&self) -> usize {
335 assert_eq!(self.r#type(), WorkerType::Compactor);
336 self.property
337 .as_ref()
338 .expect("property should be exist")
339 .parallelism as usize
340 }
341
342 pub fn parallelism(&self) -> Option<usize> {
343 match self.r#type() {
344 WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
345 WorkerType::Compactor => Some(self.compactor_node_parallelism()),
346 _ => None,
347 }
348 }
349
350 pub fn resource_group(&self) -> Option<String> {
351 self.property
352 .as_ref()
353 .and_then(|p| p.resource_group.clone())
354 }
355}
356
357impl stream_plan::SourceNode {
358 pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
359 Some(
360 self.source_inner
361 .as_ref()?
362 .columns
363 .iter()
364 .map(|c| c.get_column_desc().unwrap().clone())
365 .collect(),
366 )
367 }
368}
369
370impl meta::table_fragments::ActorStatus {
371 pub fn worker_id(&self) -> WorkerId {
372 self.location
373 .as_ref()
374 .expect("actor location should be exist")
375 .worker_node_id
376 }
377}
378
379impl common::ActorLocation {
380 pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
381 Some(Self { worker_node_id })
382 }
383}
384
385impl meta::event_log::EventRecovery {
386 pub fn event_type(&self) -> &str {
387 match self.recovery_event.as_ref() {
388 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
389 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
390 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
391 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
392 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
393 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
394 None => "UNKNOWN_RECOVERY_EVENT",
395 }
396 }
397
398 pub fn database_recovery_start(database_id: u32) -> Self {
399 Self {
400 recovery_event: Some(RecoveryEvent::DatabaseStart(
401 event_recovery::DatabaseRecoveryStart { database_id },
402 )),
403 }
404 }
405
406 pub fn database_recovery_failure(database_id: u32) -> Self {
407 Self {
408 recovery_event: Some(RecoveryEvent::DatabaseFailure(
409 event_recovery::DatabaseRecoveryFailure { database_id },
410 )),
411 }
412 }
413
414 pub fn database_recovery_success(database_id: u32) -> Self {
415 Self {
416 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
417 event_recovery::DatabaseRecoverySuccess { database_id },
418 )),
419 }
420 }
421
422 pub fn global_recovery_start(reason: String) -> Self {
423 Self {
424 recovery_event: Some(RecoveryEvent::GlobalStart(
425 event_recovery::GlobalRecoveryStart { reason },
426 )),
427 }
428 }
429
430 pub fn global_recovery_success(
431 reason: String,
432 duration_secs: f32,
433 running_database_ids: Vec<u32>,
434 recovering_database_ids: Vec<u32>,
435 ) -> Self {
436 Self {
437 recovery_event: Some(RecoveryEvent::GlobalSuccess(
438 event_recovery::GlobalRecoverySuccess {
439 reason,
440 duration_secs,
441 running_database_ids,
442 recovering_database_ids,
443 },
444 )),
445 }
446 }
447
448 pub fn global_recovery_failure(reason: String, error: String) -> Self {
449 Self {
450 recovery_event: Some(RecoveryEvent::GlobalFailure(
451 event_recovery::GlobalRecoveryFailure { reason, error },
452 )),
453 }
454 }
455}
456
457impl stream_plan::StreamNode {
458 pub fn find_stream_source(&self) -> Option<SourceId> {
462 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
463 self.node_body.as_ref()
464 && let Some(inner) = &source.source_inner
465 {
466 return Some(inner.source_id);
467 }
468
469 for child in &self.input {
470 if let Some(source) = child.find_stream_source() {
471 return Some(source);
472 }
473 }
474
475 None
476 }
477
478 pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
486 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
487 self.node_body.as_ref()
488 {
489 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
490 self.input[0].node_body.as_ref().unwrap()
491 {
492 return Some((source.upstream_source_id, merge.upstream_fragment_id));
495 } else {
496 unreachable!(
497 "source backfill must have a merge node as its input: {:?}",
498 self
499 );
500 }
501 }
502
503 for child in &self.input {
504 if let Some(source) = child.find_source_backfill() {
505 return Some(source);
506 }
507 }
508
509 None
510 }
511}
512impl stream_plan::Dispatcher {
513 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
514 stream_plan::DispatchStrategy {
515 r#type: self.r#type,
516 dist_key_indices: self.dist_key_indices.clone(),
517 output_mapping: self.output_mapping.clone(),
518 }
519 }
520}
521
522impl stream_plan::DispatchOutputMapping {
523 pub fn identical(len: usize) -> Self {
525 Self {
526 indices: (0..len as u32).collect(),
527 types: Vec::new(),
528 }
529 }
530
531 pub fn simple(indices: Vec<u32>) -> Self {
533 Self {
534 indices,
535 types: Vec::new(),
536 }
537 }
538
539 pub fn into_simple_indices(self) -> Vec<u32> {
541 assert!(
542 self.types.is_empty(),
543 "types must be empty for simple mapping"
544 );
545 self.indices
546 }
547}
548
549impl catalog::StreamSourceInfo {
550 pub fn is_shared(&self) -> bool {
552 self.cdc_source_job
553 }
554}
555
556impl stream_plan::PbStreamScanType {
557 pub fn is_reschedulable(&self, is_online: bool) -> bool {
558 match self {
559 PbStreamScanType::Unspecified => {
560 unreachable!()
561 }
562 PbStreamScanType::UpstreamOnly => false,
564 PbStreamScanType::ArrangementBackfill => true,
565 PbStreamScanType::CrossDbSnapshotBackfill => true,
566 PbStreamScanType::SnapshotBackfill => !is_online,
567 PbStreamScanType::Chain | PbStreamScanType::Rearrange | PbStreamScanType::Backfill => {
568 false
569 }
570 }
571 }
572}
573
574impl catalog::Sink {
575 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
578
579 pub fn unique_identity(&self) -> String {
580 format!("{}", self.id)
582 }
583
584 #[allow(deprecated)]
588 pub fn ignore_delete(&self) -> bool {
589 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
590 }
591}
592
593impl stream_plan::SinkDesc {
594 #[allow(deprecated)]
598 pub fn ignore_delete(&self) -> bool {
599 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
600 }
601}
602
603impl connector_service::SinkParam {
604 #[allow(deprecated)]
608 pub fn ignore_delete(&self) -> bool {
609 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
610 }
611}
612
613impl catalog::Table {
614 #[expect(deprecated)]
625 pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
626 if !self.clean_watermark_indices.is_empty() {
627 self.clean_watermark_indices.clone()
629 } else if let Some(pk_idx) = self
630 .clean_watermark_index_in_pk
631 .or_else(|| (!self.pk.is_empty()).then_some(0))
633 {
634 if let Some(col_order) = self.pk.get(pk_idx as usize) {
637 vec![col_order.column_index]
638 } else {
639 if cfg!(debug_assertions) {
640 panic!("clean_watermark_index_in_pk is out of range: {self:?}");
641 }
642 vec![]
643 }
644 } else {
645 vec![]
646 }
647 }
648
649 pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
662 let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
663
664 let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
666 .iter()
667 .filter_map(|&col_idx| {
668 self.pk
669 .iter()
670 .position(|col_order| col_order.column_index == col_idx)
671 })
672 .collect();
673
674 clean_watermark_indices_in_pk.iter().min().copied()
676 }
677}
678
679impl std::fmt::Debug for meta::SystemParams {
680 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684 f.debug_struct("SystemParams").finish_non_exhaustive()
685 }
686}
687
688impl std::fmt::Debug for data::DataType {
691 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
692 let data::DataType {
693 precision,
694 scale,
695 interval_type,
696 field_type,
697 field_names,
698 field_ids,
699 type_name,
700 is_nullable: _,
702 } = self;
703
704 let type_name = data::data_type::TypeName::try_from(*type_name)
705 .map(|t| t.as_str_name())
706 .unwrap_or("Unknown");
707
708 let mut s = f.debug_struct(type_name);
709 if self.precision != 0 {
710 s.field("precision", precision);
711 }
712 if self.scale != 0 {
713 s.field("scale", scale);
714 }
715 if self.interval_type != 0 {
716 s.field("interval_type", interval_type);
717 }
718 if !self.field_type.is_empty() {
719 s.field("field_type", field_type);
720 }
721 if !self.field_names.is_empty() {
722 s.field("field_names", field_names);
723 }
724 if !self.field_ids.is_empty() {
725 s.field("field_ids", field_ids);
726 }
727 s.finish()
728 }
729}
730
731impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
732 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733 match self {
734 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
735 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
736 }
737 }
738}
739
740impl std::fmt::Debug for plan_common::ColumnDesc {
741 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
742 let plan_common::ColumnDesc {
744 column_type,
745 column_id,
746 name,
747 description,
748 additional_column_type,
749 additional_column,
750 generated_or_default_column,
751 version,
752 nullable,
753 } = self;
754
755 let mut s = f.debug_struct("ColumnDesc");
756 if let Some(column_type) = column_type {
757 s.field("column_type", column_type);
758 } else {
759 s.field("column_type", &"Unknown");
760 }
761 s.field("column_id", column_id).field("name", name);
762 if let Some(description) = description {
763 s.field("description", description);
764 }
765 if self.additional_column_type != 0 {
766 s.field("additional_column_type", additional_column_type);
767 }
768 s.field("version", version);
769 if let Some(AdditionalColumn {
770 column_type: Some(column_type),
771 }) = additional_column
772 {
773 s.field("additional_column", &column_type);
775 }
776 if let Some(generated_or_default_column) = generated_or_default_column {
777 s.field("generated_or_default_column", &generated_or_default_column);
778 }
779 s.field("nullable", nullable);
780 s.finish()
781 }
782}
783
784impl expr::UserDefinedFunction {
785 pub fn name_in_runtime(&self) -> Option<&str> {
786 if self.version() < expr::UdfExprVersion::NameInRuntime {
787 if self.language == "rust" || self.language == "wasm" {
788 Some(&self.name)
792 } else {
793 self.identifier.as_deref()
795 }
796 } else {
797 self.identifier.as_deref()
799 }
800 }
801}
802
803impl expr::UserDefinedFunctionMetadata {
804 pub fn name_in_runtime(&self) -> Option<&str> {
805 if self.version() < expr::UdfExprVersion::NameInRuntime {
806 if self.language == "rust" || self.language == "wasm" {
807 let old_identifier = self
812 .identifier
813 .as_ref()
814 .expect("Rust/WASM UDF must have identifier");
815 Some(
816 old_identifier
817 .split_once("(")
818 .expect("the old identifier must contain `(`")
819 .0,
820 )
821 } else {
822 self.identifier.as_deref()
824 }
825 } else {
826 self.identifier.as_deref()
828 }
829 }
830}
831
832impl streaming_job_resource_type::ResourceType {
833 pub fn resource_group(&self) -> Option<String> {
834 match self {
835 streaming_job_resource_type::ResourceType::Regular(_) => None,
836 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
837 | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
838 Some(group.clone())
839 }
840 }
841 }
842}
843
844#[cfg(test)]
845mod tests {
846 use crate::data::{DataType, data_type};
847 use crate::plan_common::Field;
848 use crate::stream_plan::stream_node::NodeBody;
849
850 #[test]
851 fn test_getter() {
852 let data_type: DataType = DataType {
853 is_nullable: true,
854 ..Default::default()
855 };
856 let field = Field {
857 data_type: Some(data_type),
858 name: "".to_owned(),
859 };
860 assert!(field.get_data_type().unwrap().is_nullable);
861 }
862
863 #[test]
864 fn test_enum_getter() {
865 let mut data_type: DataType = DataType::default();
866 data_type.type_name = data_type::TypeName::Double as i32;
867 assert_eq!(
868 data_type::TypeName::Double,
869 data_type.get_type_name().unwrap()
870 );
871 }
872
873 #[test]
874 fn test_enum_unspecified() {
875 let mut data_type: DataType = DataType::default();
876 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
877 assert!(data_type.get_type_name().is_err());
878 }
879
880 #[test]
881 fn test_primitive_getter() {
882 let data_type: DataType = DataType::default();
883 let new_data_type = DataType {
884 is_nullable: data_type.get_is_nullable(),
885 ..Default::default()
886 };
887 assert!(!new_data_type.is_nullable);
888 }
889
890 #[test]
891 fn test_size() {
892 use static_assertions::const_assert_eq;
893 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
896 }
897
898 #[test]
899 #[expect(deprecated)]
900 fn test_get_clean_watermark_index_in_pk_compat() {
901 use crate::catalog::Table;
902 use crate::common::{ColumnOrder, OrderType};
903
904 fn create_column_order(column_index: u32) -> ColumnOrder {
905 ColumnOrder {
906 column_index,
907 order_type: Some(OrderType::default()),
908 }
909 }
910
911 let table = Table {
913 clean_watermark_indices: vec![3, 2],
914 clean_watermark_index_in_pk: Some(0),
915 pk: vec![
916 create_column_order(1),
917 create_column_order(2),
918 create_column_order(3),
919 create_column_order(4),
920 ],
921 ..Default::default()
922 };
923 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
924
925 let table = Table {
927 clean_watermark_indices: vec![],
928 clean_watermark_index_in_pk: Some(1),
929 pk: vec![create_column_order(0), create_column_order(2)],
930 ..Default::default()
931 };
932 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
933
934 let table = Table {
936 clean_watermark_indices: vec![],
937 clean_watermark_index_in_pk: None,
938 ..Default::default()
939 };
940 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
941 }
942}