1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25#![expect(clippy::large_enum_variant)]
27
28pub mod id;
29
30use std::str::FromStr;
31
32use event_recovery::RecoveryEvent;
33use plan_common::AdditionalColumn;
34pub use prost::Message;
35use risingwave_error::tonic::ToTonicStatus;
36use thiserror::Error;
37
38use crate::common::WorkerType;
39use crate::ddl_service::streaming_job_resource_type;
40use crate::id::{FragmentId, SourceId, WorkerId};
41use crate::meta::event_log::event_recovery;
42use crate::stream_plan::PbStreamScanType;
43
44#[rustfmt::skip]
45#[cfg_attr(madsim, path = "sim/catalog.rs")]
46pub mod catalog;
47#[rustfmt::skip]
48#[cfg_attr(madsim, path = "sim/common.rs")]
49pub mod common;
50#[rustfmt::skip]
51#[cfg_attr(madsim, path = "sim/compute.rs")]
52pub mod compute;
53#[rustfmt::skip]
54#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
55pub mod cloud_service;
56#[rustfmt::skip]
57#[cfg_attr(madsim, path = "sim/data.rs")]
58pub mod data;
59#[rustfmt::skip]
60#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
61pub mod ddl_service;
62#[rustfmt::skip]
63#[cfg_attr(madsim, path = "sim/expr.rs")]
64pub mod expr;
65#[rustfmt::skip]
66#[cfg_attr(madsim, path = "sim/meta.rs")]
67pub mod meta;
68#[rustfmt::skip]
69#[cfg_attr(madsim, path = "sim/plan_common.rs")]
70pub mod plan_common;
71#[rustfmt::skip]
72#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
73pub mod batch_plan;
74#[rustfmt::skip]
75#[cfg_attr(madsim, path = "sim/task_service.rs")]
76pub mod task_service;
77#[rustfmt::skip]
78#[cfg_attr(madsim, path = "sim/connector_service.rs")]
79pub mod connector_service;
80#[rustfmt::skip]
81#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
82pub mod stream_plan;
83#[rustfmt::skip]
84#[cfg_attr(madsim, path = "sim/stream_service.rs")]
85pub mod stream_service;
86#[rustfmt::skip]
87#[cfg_attr(madsim, path = "sim/hummock.rs")]
88pub mod hummock;
89#[rustfmt::skip]
90#[cfg_attr(madsim, path = "sim/compactor.rs")]
91pub mod compactor;
92#[rustfmt::skip]
93#[cfg_attr(madsim, path = "sim/user.rs")]
94pub mod user;
95#[rustfmt::skip]
96#[cfg_attr(madsim, path = "sim/source.rs")]
97pub mod source;
98#[rustfmt::skip]
99#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
100pub mod monitor_service;
101#[rustfmt::skip]
102#[cfg_attr(madsim, path = "sim/backup_service.rs")]
103pub mod backup_service;
104#[rustfmt::skip]
105#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
106pub mod serverless_backfill_controller;
107#[rustfmt::skip]
108#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
109pub mod frontend_service;
110#[rustfmt::skip]
111#[cfg_attr(madsim, path = "sim/java_binding.rs")]
112pub mod java_binding;
113#[rustfmt::skip]
114#[cfg_attr(madsim, path = "sim/health.rs")]
115pub mod health;
116#[rustfmt::skip]
117#[path = "sim/telemetry.rs"]
118pub mod telemetry;
119#[rustfmt::skip]
120#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
121pub mod iceberg_compaction;
122
123#[rustfmt::skip]
124#[path = "sim/secret.rs"]
125pub mod secret;
126#[rustfmt::skip]
127#[path = "connector_service.serde.rs"]
128pub mod connector_service_serde;
129#[rustfmt::skip]
130#[path = "catalog.serde.rs"]
131pub mod catalog_serde;
132#[rustfmt::skip]
133#[path = "common.serde.rs"]
134pub mod common_serde;
135#[rustfmt::skip]
136#[path = "compute.serde.rs"]
137pub mod compute_serde;
138#[rustfmt::skip]
139#[path = "cloud_service.serde.rs"]
140pub mod cloud_service_serde;
141#[rustfmt::skip]
142#[path = "data.serde.rs"]
143pub mod data_serde;
144#[rustfmt::skip]
145#[path = "ddl_service.serde.rs"]
146pub mod ddl_service_serde;
147#[rustfmt::skip]
148#[path = "expr.serde.rs"]
149pub mod expr_serde;
150#[rustfmt::skip]
151#[path = "meta.serde.rs"]
152pub mod meta_serde;
153#[rustfmt::skip]
154#[path = "plan_common.serde.rs"]
155pub mod plan_common_serde;
156#[rustfmt::skip]
157#[path = "batch_plan.serde.rs"]
158pub mod batch_plan_serde;
159#[rustfmt::skip]
160#[path = "task_service.serde.rs"]
161pub mod task_service_serde;
162#[rustfmt::skip]
163#[path = "stream_plan.serde.rs"]
164pub mod stream_plan_serde;
165#[rustfmt::skip]
166#[path = "stream_service.serde.rs"]
167pub mod stream_service_serde;
168#[rustfmt::skip]
169#[path = "hummock.serde.rs"]
170pub mod hummock_serde;
171#[rustfmt::skip]
172#[path = "compactor.serde.rs"]
173pub mod compactor_serde;
174#[rustfmt::skip]
175#[path = "user.serde.rs"]
176pub mod user_serde;
177#[rustfmt::skip]
178#[path = "source.serde.rs"]
179pub mod source_serde;
180#[rustfmt::skip]
181#[path = "monitor_service.serde.rs"]
182pub mod monitor_service_serde;
183#[rustfmt::skip]
184#[path = "backup_service.serde.rs"]
185pub mod backup_service_serde;
186#[rustfmt::skip]
187#[path = "java_binding.serde.rs"]
188pub mod java_binding_serde;
189#[rustfmt::skip]
190#[path = "telemetry.serde.rs"]
191pub mod telemetry_serde;
192
193#[rustfmt::skip]
194#[path = "secret.serde.rs"]
195pub mod secret_serde;
196#[rustfmt::skip]
197#[path = "serverless_backfill_controller.serde.rs"]
198pub mod serverless_backfill_controller_serde;
199
200#[derive(Clone, PartialEq, Eq, Debug, Error)]
201#[error("field `{0}` not found")]
202pub struct PbFieldNotFound(pub &'static str);
203
204impl From<PbFieldNotFound> for tonic::Status {
205 fn from(e: PbFieldNotFound) -> Self {
206 e.to_status_unnamed(tonic::Code::Internal)
207 }
208}
209
210impl FromStr for crate::expr::table_function::PbType {
211 type Err = ();
212
213 fn from_str(s: &str) -> Result<Self, Self::Err> {
214 Self::from_str_name(&s.to_uppercase()).ok_or(())
215 }
216}
217
218impl FromStr for crate::expr::agg_call::PbKind {
219 type Err = ();
220
221 fn from_str(s: &str) -> Result<Self, Self::Err> {
222 Self::from_str_name(&s.to_uppercase()).ok_or(())
223 }
224}
225
226impl stream_plan::MaterializeNode {
227 pub fn dist_key_indices(&self) -> Vec<u32> {
228 self.get_table()
229 .unwrap()
230 .distribution_key
231 .iter()
232 .map(|i| *i as u32)
233 .collect()
234 }
235
236 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
237 self.get_table()
238 .unwrap()
239 .columns
240 .iter()
241 .map(|c| c.get_column_desc().unwrap().clone())
242 .collect()
243 }
244}
245
246impl stream_plan::StreamScanNode {
247 pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
249 self.upstream_column_ids
250 .iter()
251 .map(|id| {
252 (self.table_desc.as_ref().unwrap().columns.iter())
253 .find(|c| c.column_id == *id)
254 .unwrap()
255 .clone()
256 })
257 .collect()
258 }
259}
260
261impl stream_plan::SourceBackfillNode {
262 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
263 self.columns
264 .iter()
265 .map(|c| c.column_desc.as_ref().unwrap().clone())
266 .collect()
267 }
268}
269
270impl common::WorkerNode {
272 pub fn compute_node_parallelism(&self) -> usize {
273 assert_eq!(self.r#type(), WorkerType::ComputeNode);
274 self.property
275 .as_ref()
276 .expect("property should be exist")
277 .parallelism as usize
278 }
279
280 fn compactor_node_parallelism(&self) -> usize {
281 assert_eq!(self.r#type(), WorkerType::Compactor);
282 self.property
283 .as_ref()
284 .expect("property should be exist")
285 .parallelism as usize
286 }
287
288 pub fn parallelism(&self) -> Option<usize> {
289 match self.r#type() {
290 WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
291 WorkerType::Compactor => Some(self.compactor_node_parallelism()),
292 _ => None,
293 }
294 }
295
296 pub fn resource_group(&self) -> Option<String> {
297 self.property
298 .as_ref()
299 .and_then(|p| p.resource_group.clone())
300 }
301}
302
303impl stream_plan::SourceNode {
304 pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
305 Some(
306 self.source_inner
307 .as_ref()?
308 .columns
309 .iter()
310 .map(|c| c.get_column_desc().unwrap().clone())
311 .collect(),
312 )
313 }
314}
315
316impl meta::table_fragments::ActorStatus {
317 pub fn worker_id(&self) -> WorkerId {
318 self.location
319 .as_ref()
320 .expect("actor location should be exist")
321 .worker_node_id
322 }
323}
324
325impl common::WorkerNode {
326 pub fn is_streaming_schedulable(&self) -> bool {
327 let property = self.property.as_ref();
328 property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
329 }
330}
331
332impl common::ActorLocation {
333 pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
334 Some(Self { worker_node_id })
335 }
336}
337
338impl meta::event_log::EventRecovery {
339 pub fn event_type(&self) -> &str {
340 match self.recovery_event.as_ref() {
341 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
342 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
343 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
344 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
345 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
346 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
347 None => "UNKNOWN_RECOVERY_EVENT",
348 }
349 }
350
351 pub fn database_recovery_start(database_id: u32) -> Self {
352 Self {
353 recovery_event: Some(RecoveryEvent::DatabaseStart(
354 event_recovery::DatabaseRecoveryStart { database_id },
355 )),
356 }
357 }
358
359 pub fn database_recovery_failure(database_id: u32) -> Self {
360 Self {
361 recovery_event: Some(RecoveryEvent::DatabaseFailure(
362 event_recovery::DatabaseRecoveryFailure { database_id },
363 )),
364 }
365 }
366
367 pub fn database_recovery_success(database_id: u32) -> Self {
368 Self {
369 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
370 event_recovery::DatabaseRecoverySuccess { database_id },
371 )),
372 }
373 }
374
375 pub fn global_recovery_start(reason: String) -> Self {
376 Self {
377 recovery_event: Some(RecoveryEvent::GlobalStart(
378 event_recovery::GlobalRecoveryStart { reason },
379 )),
380 }
381 }
382
383 pub fn global_recovery_success(
384 reason: String,
385 duration_secs: f32,
386 running_database_ids: Vec<u32>,
387 recovering_database_ids: Vec<u32>,
388 ) -> Self {
389 Self {
390 recovery_event: Some(RecoveryEvent::GlobalSuccess(
391 event_recovery::GlobalRecoverySuccess {
392 reason,
393 duration_secs,
394 running_database_ids,
395 recovering_database_ids,
396 },
397 )),
398 }
399 }
400
401 pub fn global_recovery_failure(reason: String, error: String) -> Self {
402 Self {
403 recovery_event: Some(RecoveryEvent::GlobalFailure(
404 event_recovery::GlobalRecoveryFailure { reason, error },
405 )),
406 }
407 }
408}
409
410impl stream_plan::StreamNode {
411 pub fn find_stream_source(&self) -> Option<SourceId> {
415 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
416 self.node_body.as_ref()
417 && let Some(inner) = &source.source_inner
418 {
419 return Some(inner.source_id);
420 }
421
422 for child in &self.input {
423 if let Some(source) = child.find_stream_source() {
424 return Some(source);
425 }
426 }
427
428 None
429 }
430
431 pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
439 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
440 self.node_body.as_ref()
441 {
442 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
443 self.input[0].node_body.as_ref().unwrap()
444 {
445 return Some((source.upstream_source_id, merge.upstream_fragment_id));
448 } else {
449 unreachable!(
450 "source backfill must have a merge node as its input: {:?}",
451 self
452 );
453 }
454 }
455
456 for child in &self.input {
457 if let Some(source) = child.find_source_backfill() {
458 return Some(source);
459 }
460 }
461
462 None
463 }
464}
465impl stream_plan::Dispatcher {
466 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
467 stream_plan::DispatchStrategy {
468 r#type: self.r#type,
469 dist_key_indices: self.dist_key_indices.clone(),
470 output_mapping: self.output_mapping.clone(),
471 }
472 }
473}
474
475impl stream_plan::DispatchOutputMapping {
476 pub fn identical(len: usize) -> Self {
478 Self {
479 indices: (0..len as u32).collect(),
480 types: Vec::new(),
481 }
482 }
483
484 pub fn simple(indices: Vec<u32>) -> Self {
486 Self {
487 indices,
488 types: Vec::new(),
489 }
490 }
491
492 pub fn into_simple_indices(self) -> Vec<u32> {
494 assert!(
495 self.types.is_empty(),
496 "types must be empty for simple mapping"
497 );
498 self.indices
499 }
500}
501
502impl catalog::StreamSourceInfo {
503 pub fn is_shared(&self) -> bool {
505 self.cdc_source_job
506 }
507}
508
509impl stream_plan::PbStreamScanType {
510 pub fn is_reschedulable(&self) -> bool {
511 match self {
512 PbStreamScanType::UpstreamOnly => false,
514 PbStreamScanType::ArrangementBackfill => true,
515 PbStreamScanType::CrossDbSnapshotBackfill => true,
516 PbStreamScanType::SnapshotBackfill => true,
517 _ => false,
518 }
519 }
520}
521
522impl catalog::Sink {
523 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
526
527 pub fn unique_identity(&self) -> String {
528 format!("{}", self.id)
530 }
531
532 #[allow(deprecated)]
536 pub fn ignore_delete(&self) -> bool {
537 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
538 }
539}
540
541impl stream_plan::SinkDesc {
542 #[allow(deprecated)]
546 pub fn ignore_delete(&self) -> bool {
547 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
548 }
549}
550
551impl connector_service::SinkParam {
552 #[allow(deprecated)]
556 pub fn ignore_delete(&self) -> bool {
557 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
558 }
559}
560
561impl catalog::Table {
562 #[expect(deprecated)]
573 pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
574 if !self.clean_watermark_indices.is_empty() {
575 self.clean_watermark_indices.clone()
577 } else if let Some(pk_idx) = self
578 .clean_watermark_index_in_pk
579 .or_else(|| (!self.pk.is_empty()).then_some(0))
581 {
582 if let Some(col_order) = self.pk.get(pk_idx as usize) {
585 vec![col_order.column_index]
586 } else {
587 if cfg!(debug_assertions) {
588 panic!("clean_watermark_index_in_pk is out of range: {self:?}");
589 }
590 vec![]
591 }
592 } else {
593 vec![]
594 }
595 }
596
597 pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
610 let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
611
612 let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
614 .iter()
615 .filter_map(|&col_idx| {
616 self.pk
617 .iter()
618 .position(|col_order| col_order.column_index == col_idx)
619 })
620 .collect();
621
622 clean_watermark_indices_in_pk.iter().min().copied()
624 }
625}
626
627impl std::fmt::Debug for meta::SystemParams {
628 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 f.debug_struct("SystemParams").finish_non_exhaustive()
633 }
634}
635
636impl std::fmt::Debug for data::DataType {
639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640 let data::DataType {
641 precision,
642 scale,
643 interval_type,
644 field_type,
645 field_names,
646 field_ids,
647 type_name,
648 is_nullable: _,
650 } = self;
651
652 let type_name = data::data_type::TypeName::try_from(*type_name)
653 .map(|t| t.as_str_name())
654 .unwrap_or("Unknown");
655
656 let mut s = f.debug_struct(type_name);
657 if self.precision != 0 {
658 s.field("precision", precision);
659 }
660 if self.scale != 0 {
661 s.field("scale", scale);
662 }
663 if self.interval_type != 0 {
664 s.field("interval_type", interval_type);
665 }
666 if !self.field_type.is_empty() {
667 s.field("field_type", field_type);
668 }
669 if !self.field_names.is_empty() {
670 s.field("field_names", field_names);
671 }
672 if !self.field_ids.is_empty() {
673 s.field("field_ids", field_ids);
674 }
675 s.finish()
676 }
677}
678
679impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
680 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681 match self {
682 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
683 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
684 }
685 }
686}
687
688impl std::fmt::Debug for plan_common::ColumnDesc {
689 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 let plan_common::ColumnDesc {
692 column_type,
693 column_id,
694 name,
695 description,
696 additional_column_type,
697 additional_column,
698 generated_or_default_column,
699 version,
700 nullable,
701 } = self;
702
703 let mut s = f.debug_struct("ColumnDesc");
704 if let Some(column_type) = column_type {
705 s.field("column_type", column_type);
706 } else {
707 s.field("column_type", &"Unknown");
708 }
709 s.field("column_id", column_id).field("name", name);
710 if let Some(description) = description {
711 s.field("description", description);
712 }
713 if self.additional_column_type != 0 {
714 s.field("additional_column_type", additional_column_type);
715 }
716 s.field("version", version);
717 if let Some(AdditionalColumn {
718 column_type: Some(column_type),
719 }) = additional_column
720 {
721 s.field("additional_column", &column_type);
723 }
724 if let Some(generated_or_default_column) = generated_or_default_column {
725 s.field("generated_or_default_column", &generated_or_default_column);
726 }
727 s.field("nullable", nullable);
728 s.finish()
729 }
730}
731
732impl expr::UserDefinedFunction {
733 pub fn name_in_runtime(&self) -> Option<&str> {
734 if self.version() < expr::UdfExprVersion::NameInRuntime {
735 if self.language == "rust" || self.language == "wasm" {
736 Some(&self.name)
740 } else {
741 self.identifier.as_deref()
743 }
744 } else {
745 self.identifier.as_deref()
747 }
748 }
749}
750
751impl expr::UserDefinedFunctionMetadata {
752 pub fn name_in_runtime(&self) -> Option<&str> {
753 if self.version() < expr::UdfExprVersion::NameInRuntime {
754 if self.language == "rust" || self.language == "wasm" {
755 let old_identifier = self
760 .identifier
761 .as_ref()
762 .expect("Rust/WASM UDF must have identifier");
763 Some(
764 old_identifier
765 .split_once("(")
766 .expect("the old identifier must contain `(`")
767 .0,
768 )
769 } else {
770 self.identifier.as_deref()
772 }
773 } else {
774 self.identifier.as_deref()
776 }
777 }
778}
779
780impl streaming_job_resource_type::ResourceType {
781 pub fn resource_group(&self) -> Option<String> {
782 match self {
783 streaming_job_resource_type::ResourceType::Regular(_) => None,
784 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
785 | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
786 Some(group.clone())
787 }
788 }
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use crate::data::{DataType, data_type};
795 use crate::plan_common::Field;
796 use crate::stream_plan::stream_node::NodeBody;
797
798 #[test]
799 fn test_getter() {
800 let data_type: DataType = DataType {
801 is_nullable: true,
802 ..Default::default()
803 };
804 let field = Field {
805 data_type: Some(data_type),
806 name: "".to_owned(),
807 };
808 assert!(field.get_data_type().unwrap().is_nullable);
809 }
810
811 #[test]
812 fn test_enum_getter() {
813 let mut data_type: DataType = DataType::default();
814 data_type.type_name = data_type::TypeName::Double as i32;
815 assert_eq!(
816 data_type::TypeName::Double,
817 data_type.get_type_name().unwrap()
818 );
819 }
820
821 #[test]
822 fn test_enum_unspecified() {
823 let mut data_type: DataType = DataType::default();
824 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
825 assert!(data_type.get_type_name().is_err());
826 }
827
828 #[test]
829 fn test_primitive_getter() {
830 let data_type: DataType = DataType::default();
831 let new_data_type = DataType {
832 is_nullable: data_type.get_is_nullable(),
833 ..Default::default()
834 };
835 assert!(!new_data_type.is_nullable);
836 }
837
838 #[test]
839 fn test_size() {
840 use static_assertions::const_assert_eq;
841 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
844 }
845
846 #[test]
847 #[expect(deprecated)]
848 fn test_get_clean_watermark_index_in_pk_compat() {
849 use crate::catalog::Table;
850 use crate::common::{ColumnOrder, OrderType};
851
852 fn create_column_order(column_index: u32) -> ColumnOrder {
853 ColumnOrder {
854 column_index,
855 order_type: Some(OrderType::default()),
856 }
857 }
858
859 let table = Table {
861 clean_watermark_indices: vec![3, 2],
862 clean_watermark_index_in_pk: Some(0),
863 pk: vec![
864 create_column_order(1),
865 create_column_order(2),
866 create_column_order(3),
867 create_column_order(4),
868 ],
869 ..Default::default()
870 };
871 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
872
873 let table = Table {
875 clean_watermark_indices: vec![],
876 clean_watermark_index_in_pk: Some(1),
877 pk: vec![create_column_order(0), create_column_order(2)],
878 ..Default::default()
879 };
880 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
881
882 let table = Table {
884 clean_watermark_indices: vec![],
885 clean_watermark_index_in_pk: None,
886 ..Default::default()
887 };
888 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
889 }
890}