1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25#![expect(clippy::large_enum_variant)]
27#![feature(step_trait)]
28
29pub mod id;
30
31use std::str::FromStr;
32
33use event_recovery::RecoveryEvent;
34use plan_common::AdditionalColumn;
35pub use prost::Message;
36use risingwave_error::tonic::ToTonicStatus;
37use thiserror::Error;
38
39use crate::common::WorkerType;
40use crate::ddl_service::streaming_job_resource_type;
41use crate::id::{FragmentId, SourceId, WorkerId};
42use crate::meta::event_log::event_recovery;
43use crate::stream_plan::PbStreamScanType;
44
45#[rustfmt::skip]
46#[cfg_attr(madsim, path = "sim/catalog.rs")]
47pub mod catalog;
48#[rustfmt::skip]
49#[cfg_attr(madsim, path = "sim/common.rs")]
50pub mod common;
51#[rustfmt::skip]
52#[cfg_attr(madsim, path = "sim/compute.rs")]
53pub mod compute;
54#[rustfmt::skip]
55#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
56pub mod cloud_service;
57#[rustfmt::skip]
58#[cfg_attr(madsim, path = "sim/data.rs")]
59pub mod data;
60#[rustfmt::skip]
61#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
62pub mod ddl_service;
63#[rustfmt::skip]
64#[cfg_attr(madsim, path = "sim/expr.rs")]
65pub mod expr;
66#[rustfmt::skip]
67#[cfg_attr(madsim, path = "sim/meta.rs")]
68pub mod meta;
69#[rustfmt::skip]
70#[cfg_attr(madsim, path = "sim/plan_common.rs")]
71pub mod plan_common;
72#[rustfmt::skip]
73#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
74pub mod batch_plan;
75#[rustfmt::skip]
76#[cfg_attr(madsim, path = "sim/task_service.rs")]
77pub mod task_service;
78#[rustfmt::skip]
79#[cfg_attr(madsim, path = "sim/connector_service.rs")]
80pub mod connector_service;
81#[rustfmt::skip]
82#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
83pub mod stream_plan;
84#[rustfmt::skip]
85#[cfg_attr(madsim, path = "sim/stream_service.rs")]
86pub mod stream_service;
87#[rustfmt::skip]
88#[cfg_attr(madsim, path = "sim/hummock.rs")]
89pub mod hummock;
90#[rustfmt::skip]
91#[cfg_attr(madsim, path = "sim/compactor.rs")]
92pub mod compactor;
93#[rustfmt::skip]
94#[cfg_attr(madsim, path = "sim/user.rs")]
95pub mod user;
96#[rustfmt::skip]
97#[cfg_attr(madsim, path = "sim/source.rs")]
98pub mod source;
99#[rustfmt::skip]
100#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
101pub mod monitor_service;
102#[rustfmt::skip]
103#[cfg_attr(madsim, path = "sim/backup_service.rs")]
104pub mod backup_service;
105#[rustfmt::skip]
106#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
107pub mod serverless_backfill_controller;
108#[rustfmt::skip]
109#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
110pub mod frontend_service;
111#[rustfmt::skip]
112#[cfg_attr(madsim, path = "sim/java_binding.rs")]
113pub mod java_binding;
114#[rustfmt::skip]
115#[cfg_attr(madsim, path = "sim/health.rs")]
116pub mod health;
117#[rustfmt::skip]
118#[path = "sim/telemetry.rs"]
119pub mod telemetry;
120#[rustfmt::skip]
121#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
122pub mod iceberg_compaction;
123
124#[rustfmt::skip]
125#[path = "sim/secret.rs"]
126pub mod secret;
127#[rustfmt::skip]
128#[path = "connector_service.serde.rs"]
129pub mod connector_service_serde;
130#[rustfmt::skip]
131#[path = "catalog.serde.rs"]
132pub mod catalog_serde;
133#[rustfmt::skip]
134#[path = "common.serde.rs"]
135pub mod common_serde;
136#[rustfmt::skip]
137#[path = "compute.serde.rs"]
138pub mod compute_serde;
139#[rustfmt::skip]
140#[path = "cloud_service.serde.rs"]
141pub mod cloud_service_serde;
142#[rustfmt::skip]
143#[path = "data.serde.rs"]
144pub mod data_serde;
145#[rustfmt::skip]
146#[path = "ddl_service.serde.rs"]
147pub mod ddl_service_serde;
148#[rustfmt::skip]
149#[path = "expr.serde.rs"]
150pub mod expr_serde;
151#[rustfmt::skip]
152#[path = "meta.serde.rs"]
153pub mod meta_serde;
154#[rustfmt::skip]
155#[path = "plan_common.serde.rs"]
156pub mod plan_common_serde;
157#[rustfmt::skip]
158#[path = "batch_plan.serde.rs"]
159pub mod batch_plan_serde;
160#[rustfmt::skip]
161#[path = "task_service.serde.rs"]
162pub mod task_service_serde;
163#[rustfmt::skip]
164#[path = "stream_plan.serde.rs"]
165pub mod stream_plan_serde;
166#[rustfmt::skip]
167#[path = "stream_service.serde.rs"]
168pub mod stream_service_serde;
169#[rustfmt::skip]
170#[path = "hummock.serde.rs"]
171pub mod hummock_serde;
172#[rustfmt::skip]
173#[path = "compactor.serde.rs"]
174pub mod compactor_serde;
175#[rustfmt::skip]
176#[path = "user.serde.rs"]
177pub mod user_serde;
178#[rustfmt::skip]
179#[path = "source.serde.rs"]
180pub mod source_serde;
181#[rustfmt::skip]
182#[path = "monitor_service.serde.rs"]
183pub mod monitor_service_serde;
184#[rustfmt::skip]
185#[path = "backup_service.serde.rs"]
186pub mod backup_service_serde;
187#[rustfmt::skip]
188#[path = "java_binding.serde.rs"]
189pub mod java_binding_serde;
190#[rustfmt::skip]
191#[path = "telemetry.serde.rs"]
192pub mod telemetry_serde;
193
194#[rustfmt::skip]
195#[path = "secret.serde.rs"]
196pub mod secret_serde;
197#[rustfmt::skip]
198#[path = "serverless_backfill_controller.serde.rs"]
199pub mod serverless_backfill_controller_serde;
200
201#[derive(Clone, PartialEq, Eq, Debug, Error)]
202#[error("field `{0}` not found")]
203pub struct PbFieldNotFound(pub &'static str);
204
205impl From<PbFieldNotFound> for tonic::Status {
206 fn from(e: PbFieldNotFound) -> Self {
207 e.to_status_unnamed(tonic::Code::Internal)
208 }
209}
210
211impl FromStr for crate::expr::table_function::PbType {
212 type Err = ();
213
214 fn from_str(s: &str) -> Result<Self, Self::Err> {
215 Self::from_str_name(&s.to_uppercase()).ok_or(())
216 }
217}
218
219impl FromStr for crate::expr::agg_call::PbKind {
220 type Err = ();
221
222 fn from_str(s: &str) -> Result<Self, Self::Err> {
223 Self::from_str_name(&s.to_uppercase()).ok_or(())
224 }
225}
226
227impl stream_plan::MaterializeNode {
228 pub fn dist_key_indices(&self) -> Vec<u32> {
229 self.get_table()
230 .unwrap()
231 .distribution_key
232 .iter()
233 .map(|i| *i as u32)
234 .collect()
235 }
236
237 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
238 self.get_table()
239 .unwrap()
240 .columns
241 .iter()
242 .map(|c| c.get_column_desc().unwrap().clone())
243 .collect()
244 }
245}
246
247impl stream_plan::StreamScanNode {
248 pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
250 self.upstream_column_ids
251 .iter()
252 .map(|id| {
253 (self.table_desc.as_ref().unwrap().columns.iter())
254 .find(|c| c.column_id == *id)
255 .unwrap()
256 .clone()
257 })
258 .collect()
259 }
260}
261
262impl stream_plan::SourceBackfillNode {
263 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
264 self.columns
265 .iter()
266 .map(|c| c.column_desc.as_ref().unwrap().clone())
267 .collect()
268 }
269}
270
271impl common::WorkerNode {
273 pub fn compute_node_parallelism(&self) -> usize {
274 assert_eq!(self.r#type(), WorkerType::ComputeNode);
275 self.property
276 .as_ref()
277 .expect("property should be exist")
278 .parallelism as usize
279 }
280
281 fn compactor_node_parallelism(&self) -> usize {
282 assert_eq!(self.r#type(), WorkerType::Compactor);
283 self.property
284 .as_ref()
285 .expect("property should be exist")
286 .parallelism as usize
287 }
288
289 pub fn parallelism(&self) -> Option<usize> {
290 match self.r#type() {
291 WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
292 WorkerType::Compactor => Some(self.compactor_node_parallelism()),
293 _ => None,
294 }
295 }
296
297 pub fn resource_group(&self) -> Option<String> {
298 self.property
299 .as_ref()
300 .and_then(|p| p.resource_group.clone())
301 }
302}
303
304impl stream_plan::SourceNode {
305 pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
306 Some(
307 self.source_inner
308 .as_ref()?
309 .columns
310 .iter()
311 .map(|c| c.get_column_desc().unwrap().clone())
312 .collect(),
313 )
314 }
315}
316
317impl meta::table_fragments::ActorStatus {
318 pub fn worker_id(&self) -> WorkerId {
319 self.location
320 .as_ref()
321 .expect("actor location should be exist")
322 .worker_node_id
323 }
324}
325
326impl common::WorkerNode {
327 pub fn is_streaming_schedulable(&self) -> bool {
328 let property = self.property.as_ref();
329 property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
330 }
331}
332
333impl common::ActorLocation {
334 pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
335 Some(Self { worker_node_id })
336 }
337}
338
339impl meta::event_log::EventRecovery {
340 pub fn event_type(&self) -> &str {
341 match self.recovery_event.as_ref() {
342 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
343 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
344 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
345 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
346 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
347 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
348 None => "UNKNOWN_RECOVERY_EVENT",
349 }
350 }
351
352 pub fn database_recovery_start(database_id: u32) -> Self {
353 Self {
354 recovery_event: Some(RecoveryEvent::DatabaseStart(
355 event_recovery::DatabaseRecoveryStart { database_id },
356 )),
357 }
358 }
359
360 pub fn database_recovery_failure(database_id: u32) -> Self {
361 Self {
362 recovery_event: Some(RecoveryEvent::DatabaseFailure(
363 event_recovery::DatabaseRecoveryFailure { database_id },
364 )),
365 }
366 }
367
368 pub fn database_recovery_success(database_id: u32) -> Self {
369 Self {
370 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
371 event_recovery::DatabaseRecoverySuccess { database_id },
372 )),
373 }
374 }
375
376 pub fn global_recovery_start(reason: String) -> Self {
377 Self {
378 recovery_event: Some(RecoveryEvent::GlobalStart(
379 event_recovery::GlobalRecoveryStart { reason },
380 )),
381 }
382 }
383
384 pub fn global_recovery_success(
385 reason: String,
386 duration_secs: f32,
387 running_database_ids: Vec<u32>,
388 recovering_database_ids: Vec<u32>,
389 ) -> Self {
390 Self {
391 recovery_event: Some(RecoveryEvent::GlobalSuccess(
392 event_recovery::GlobalRecoverySuccess {
393 reason,
394 duration_secs,
395 running_database_ids,
396 recovering_database_ids,
397 },
398 )),
399 }
400 }
401
402 pub fn global_recovery_failure(reason: String, error: String) -> Self {
403 Self {
404 recovery_event: Some(RecoveryEvent::GlobalFailure(
405 event_recovery::GlobalRecoveryFailure { reason, error },
406 )),
407 }
408 }
409}
410
411impl stream_plan::StreamNode {
412 pub fn find_stream_source(&self) -> Option<SourceId> {
416 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
417 self.node_body.as_ref()
418 && let Some(inner) = &source.source_inner
419 {
420 return Some(inner.source_id);
421 }
422
423 for child in &self.input {
424 if let Some(source) = child.find_stream_source() {
425 return Some(source);
426 }
427 }
428
429 None
430 }
431
432 pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
440 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
441 self.node_body.as_ref()
442 {
443 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
444 self.input[0].node_body.as_ref().unwrap()
445 {
446 return Some((source.upstream_source_id, merge.upstream_fragment_id));
449 } else {
450 unreachable!(
451 "source backfill must have a merge node as its input: {:?}",
452 self
453 );
454 }
455 }
456
457 for child in &self.input {
458 if let Some(source) = child.find_source_backfill() {
459 return Some(source);
460 }
461 }
462
463 None
464 }
465}
466impl stream_plan::Dispatcher {
467 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
468 stream_plan::DispatchStrategy {
469 r#type: self.r#type,
470 dist_key_indices: self.dist_key_indices.clone(),
471 output_mapping: self.output_mapping.clone(),
472 }
473 }
474}
475
476impl stream_plan::DispatchOutputMapping {
477 pub fn identical(len: usize) -> Self {
479 Self {
480 indices: (0..len as u32).collect(),
481 types: Vec::new(),
482 }
483 }
484
485 pub fn simple(indices: Vec<u32>) -> Self {
487 Self {
488 indices,
489 types: Vec::new(),
490 }
491 }
492
493 pub fn into_simple_indices(self) -> Vec<u32> {
495 assert!(
496 self.types.is_empty(),
497 "types must be empty for simple mapping"
498 );
499 self.indices
500 }
501}
502
503impl catalog::StreamSourceInfo {
504 pub fn is_shared(&self) -> bool {
506 self.cdc_source_job
507 }
508}
509
510impl stream_plan::PbStreamScanType {
511 pub fn is_reschedulable(&self, is_online: bool) -> bool {
512 match self {
513 PbStreamScanType::Unspecified => {
514 unreachable!()
515 }
516 PbStreamScanType::UpstreamOnly => false,
518 PbStreamScanType::ArrangementBackfill => true,
519 PbStreamScanType::CrossDbSnapshotBackfill => true,
520 PbStreamScanType::SnapshotBackfill => !is_online,
521 PbStreamScanType::Chain | PbStreamScanType::Rearrange | PbStreamScanType::Backfill => {
522 false
523 }
524 }
525 }
526}
527
528impl catalog::Sink {
529 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
532
533 pub fn unique_identity(&self) -> String {
534 format!("{}", self.id)
536 }
537
538 #[allow(deprecated)]
542 pub fn ignore_delete(&self) -> bool {
543 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
544 }
545}
546
547impl stream_plan::SinkDesc {
548 #[allow(deprecated)]
552 pub fn ignore_delete(&self) -> bool {
553 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
554 }
555}
556
557impl connector_service::SinkParam {
558 #[allow(deprecated)]
562 pub fn ignore_delete(&self) -> bool {
563 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
564 }
565}
566
567impl catalog::Table {
568 #[expect(deprecated)]
579 pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
580 if !self.clean_watermark_indices.is_empty() {
581 self.clean_watermark_indices.clone()
583 } else if let Some(pk_idx) = self
584 .clean_watermark_index_in_pk
585 .or_else(|| (!self.pk.is_empty()).then_some(0))
587 {
588 if let Some(col_order) = self.pk.get(pk_idx as usize) {
591 vec![col_order.column_index]
592 } else {
593 if cfg!(debug_assertions) {
594 panic!("clean_watermark_index_in_pk is out of range: {self:?}");
595 }
596 vec![]
597 }
598 } else {
599 vec![]
600 }
601 }
602
603 pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
616 let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
617
618 let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
620 .iter()
621 .filter_map(|&col_idx| {
622 self.pk
623 .iter()
624 .position(|col_order| col_order.column_index == col_idx)
625 })
626 .collect();
627
628 clean_watermark_indices_in_pk.iter().min().copied()
630 }
631}
632
633impl std::fmt::Debug for meta::SystemParams {
634 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638 f.debug_struct("SystemParams").finish_non_exhaustive()
639 }
640}
641
642impl std::fmt::Debug for data::DataType {
645 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646 let data::DataType {
647 precision,
648 scale,
649 interval_type,
650 field_type,
651 field_names,
652 field_ids,
653 type_name,
654 is_nullable: _,
656 } = self;
657
658 let type_name = data::data_type::TypeName::try_from(*type_name)
659 .map(|t| t.as_str_name())
660 .unwrap_or("Unknown");
661
662 let mut s = f.debug_struct(type_name);
663 if self.precision != 0 {
664 s.field("precision", precision);
665 }
666 if self.scale != 0 {
667 s.field("scale", scale);
668 }
669 if self.interval_type != 0 {
670 s.field("interval_type", interval_type);
671 }
672 if !self.field_type.is_empty() {
673 s.field("field_type", field_type);
674 }
675 if !self.field_names.is_empty() {
676 s.field("field_names", field_names);
677 }
678 if !self.field_ids.is_empty() {
679 s.field("field_ids", field_ids);
680 }
681 s.finish()
682 }
683}
684
685impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
686 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
687 match self {
688 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
689 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
690 }
691 }
692}
693
694impl std::fmt::Debug for plan_common::ColumnDesc {
695 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
696 let plan_common::ColumnDesc {
698 column_type,
699 column_id,
700 name,
701 description,
702 additional_column_type,
703 additional_column,
704 generated_or_default_column,
705 version,
706 nullable,
707 } = self;
708
709 let mut s = f.debug_struct("ColumnDesc");
710 if let Some(column_type) = column_type {
711 s.field("column_type", column_type);
712 } else {
713 s.field("column_type", &"Unknown");
714 }
715 s.field("column_id", column_id).field("name", name);
716 if let Some(description) = description {
717 s.field("description", description);
718 }
719 if self.additional_column_type != 0 {
720 s.field("additional_column_type", additional_column_type);
721 }
722 s.field("version", version);
723 if let Some(AdditionalColumn {
724 column_type: Some(column_type),
725 }) = additional_column
726 {
727 s.field("additional_column", &column_type);
729 }
730 if let Some(generated_or_default_column) = generated_or_default_column {
731 s.field("generated_or_default_column", &generated_or_default_column);
732 }
733 s.field("nullable", nullable);
734 s.finish()
735 }
736}
737
738impl expr::UserDefinedFunction {
739 pub fn name_in_runtime(&self) -> Option<&str> {
740 if self.version() < expr::UdfExprVersion::NameInRuntime {
741 if self.language == "rust" || self.language == "wasm" {
742 Some(&self.name)
746 } else {
747 self.identifier.as_deref()
749 }
750 } else {
751 self.identifier.as_deref()
753 }
754 }
755}
756
757impl expr::UserDefinedFunctionMetadata {
758 pub fn name_in_runtime(&self) -> Option<&str> {
759 if self.version() < expr::UdfExprVersion::NameInRuntime {
760 if self.language == "rust" || self.language == "wasm" {
761 let old_identifier = self
766 .identifier
767 .as_ref()
768 .expect("Rust/WASM UDF must have identifier");
769 Some(
770 old_identifier
771 .split_once("(")
772 .expect("the old identifier must contain `(`")
773 .0,
774 )
775 } else {
776 self.identifier.as_deref()
778 }
779 } else {
780 self.identifier.as_deref()
782 }
783 }
784}
785
786impl streaming_job_resource_type::ResourceType {
787 pub fn resource_group(&self) -> Option<String> {
788 match self {
789 streaming_job_resource_type::ResourceType::Regular(_) => None,
790 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
791 | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
792 Some(group.clone())
793 }
794 }
795 }
796}
797
798#[cfg(test)]
799mod tests {
800 use crate::data::{DataType, data_type};
801 use crate::plan_common::Field;
802 use crate::stream_plan::stream_node::NodeBody;
803
804 #[test]
805 fn test_getter() {
806 let data_type: DataType = DataType {
807 is_nullable: true,
808 ..Default::default()
809 };
810 let field = Field {
811 data_type: Some(data_type),
812 name: "".to_owned(),
813 };
814 assert!(field.get_data_type().unwrap().is_nullable);
815 }
816
817 #[test]
818 fn test_enum_getter() {
819 let mut data_type: DataType = DataType::default();
820 data_type.type_name = data_type::TypeName::Double as i32;
821 assert_eq!(
822 data_type::TypeName::Double,
823 data_type.get_type_name().unwrap()
824 );
825 }
826
827 #[test]
828 fn test_enum_unspecified() {
829 let mut data_type: DataType = DataType::default();
830 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
831 assert!(data_type.get_type_name().is_err());
832 }
833
834 #[test]
835 fn test_primitive_getter() {
836 let data_type: DataType = DataType::default();
837 let new_data_type = DataType {
838 is_nullable: data_type.get_is_nullable(),
839 ..Default::default()
840 };
841 assert!(!new_data_type.is_nullable);
842 }
843
844 #[test]
845 fn test_size() {
846 use static_assertions::const_assert_eq;
847 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
850 }
851
852 #[test]
853 #[expect(deprecated)]
854 fn test_get_clean_watermark_index_in_pk_compat() {
855 use crate::catalog::Table;
856 use crate::common::{ColumnOrder, OrderType};
857
858 fn create_column_order(column_index: u32) -> ColumnOrder {
859 ColumnOrder {
860 column_index,
861 order_type: Some(OrderType::default()),
862 }
863 }
864
865 let table = Table {
867 clean_watermark_indices: vec![3, 2],
868 clean_watermark_index_in_pk: Some(0),
869 pk: vec![
870 create_column_order(1),
871 create_column_order(2),
872 create_column_order(3),
873 create_column_order(4),
874 ],
875 ..Default::default()
876 };
877 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
878
879 let table = Table {
881 clean_watermark_indices: vec![],
882 clean_watermark_index_in_pk: Some(1),
883 pk: vec![create_column_order(0), create_column_order(2)],
884 ..Default::default()
885 };
886 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
887
888 let table = Table {
890 clean_watermark_indices: vec![],
891 clean_watermark_index_in_pk: None,
892 ..Default::default()
893 };
894 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
895 }
896}