1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![allow(clippy::doc_lazy_continuation)]
18#![expect(clippy::doc_markdown)]
20#![expect(clippy::upper_case_acronyms)]
21#![expect(clippy::needless_lifetimes)]
22#![expect(clippy::disallowed_methods)]
24#![expect(clippy::enum_variant_names)]
25#![expect(clippy::module_inception)]
26#![expect(clippy::large_enum_variant)]
28#![feature(step_trait)]
29
30pub mod id;
31
32use std::str::FromStr;
33
34use event_recovery::RecoveryEvent;
35use plan_common::AdditionalColumn;
36pub use prost::Message;
37use risingwave_error::tonic::ToTonicStatus;
38use thiserror::Error;
39
40use crate::common::WorkerType;
41use crate::ddl_service::streaming_job_resource_type;
42use crate::id::{FragmentId, SourceId, WorkerId};
43use crate::meta::event_log::event_recovery;
44use crate::stream_plan::PbStreamScanType;
45
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/catalog.rs")]
48pub mod catalog;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/common.rs")]
51pub mod common;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/compute.rs")]
54pub mod compute;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
57pub mod cloud_service;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/data.rs")]
60pub mod data;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
63pub mod ddl_service;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/expr.rs")]
66pub mod expr;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/meta.rs")]
69pub mod meta;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/plan_common.rs")]
72pub mod plan_common;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
75pub mod batch_plan;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/task_service.rs")]
78pub mod task_service;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/connector_service.rs")]
81pub mod connector_service;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
84pub mod stream_plan;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/stream_service.rs")]
87pub mod stream_service;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/hummock.rs")]
90pub mod hummock;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/compactor.rs")]
93pub mod compactor;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/user.rs")]
96pub mod user;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/source.rs")]
99pub mod source;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
102pub mod monitor_service;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/backup_service.rs")]
105pub mod backup_service;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
108pub mod serverless_backfill_controller;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
111pub mod frontend_service;
112#[rustfmt::skip]
113#[cfg_attr(madsim, path = "sim/java_binding.rs")]
114pub mod java_binding;
115#[rustfmt::skip]
116#[cfg_attr(madsim, path = "sim/health.rs")]
117pub mod health;
118#[rustfmt::skip]
119#[path = "sim/telemetry.rs"]
120pub mod telemetry;
121#[rustfmt::skip]
122#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
123pub mod iceberg_compaction;
124
125#[rustfmt::skip]
126#[path = "sim/secret.rs"]
127pub mod secret;
128#[rustfmt::skip]
129#[path = "connector_service.serde.rs"]
130pub mod connector_service_serde;
131#[rustfmt::skip]
132#[path = "catalog.serde.rs"]
133pub mod catalog_serde;
134#[rustfmt::skip]
135#[path = "common.serde.rs"]
136pub mod common_serde;
137#[rustfmt::skip]
138#[path = "compute.serde.rs"]
139pub mod compute_serde;
140#[rustfmt::skip]
141#[path = "cloud_service.serde.rs"]
142pub mod cloud_service_serde;
143#[rustfmt::skip]
144#[path = "data.serde.rs"]
145pub mod data_serde;
146#[rustfmt::skip]
147#[path = "ddl_service.serde.rs"]
148pub mod ddl_service_serde;
149#[rustfmt::skip]
150#[path = "expr.serde.rs"]
151pub mod expr_serde;
152#[rustfmt::skip]
153#[path = "meta.serde.rs"]
154pub mod meta_serde;
155#[rustfmt::skip]
156#[path = "plan_common.serde.rs"]
157pub mod plan_common_serde;
158#[rustfmt::skip]
159#[path = "batch_plan.serde.rs"]
160pub mod batch_plan_serde;
161#[rustfmt::skip]
162#[path = "task_service.serde.rs"]
163pub mod task_service_serde;
164#[rustfmt::skip]
165#[path = "stream_plan.serde.rs"]
166pub mod stream_plan_serde;
167#[rustfmt::skip]
168#[path = "stream_service.serde.rs"]
169pub mod stream_service_serde;
170#[rustfmt::skip]
171#[path = "hummock.serde.rs"]
172pub mod hummock_serde;
173#[rustfmt::skip]
174#[path = "compactor.serde.rs"]
175pub mod compactor_serde;
176#[rustfmt::skip]
177#[path = "user.serde.rs"]
178pub mod user_serde;
179#[rustfmt::skip]
180#[path = "source.serde.rs"]
181pub mod source_serde;
182#[rustfmt::skip]
183#[path = "monitor_service.serde.rs"]
184pub mod monitor_service_serde;
185#[rustfmt::skip]
186#[path = "backup_service.serde.rs"]
187pub mod backup_service_serde;
188#[rustfmt::skip]
189#[path = "java_binding.serde.rs"]
190pub mod java_binding_serde;
191#[rustfmt::skip]
192#[path = "telemetry.serde.rs"]
193pub mod telemetry_serde;
194
195#[rustfmt::skip]
196#[path = "secret.serde.rs"]
197pub mod secret_serde;
198#[rustfmt::skip]
199#[path = "serverless_backfill_controller.serde.rs"]
200pub mod serverless_backfill_controller_serde;
201
202#[derive(Clone, PartialEq, Eq, Debug, Error)]
203#[error("field `{0}` not found")]
204pub struct PbFieldNotFound(pub &'static str);
205
206impl From<PbFieldNotFound> for tonic::Status {
207 fn from(e: PbFieldNotFound) -> Self {
208 e.to_status_unnamed(tonic::Code::Internal)
209 }
210}
211
212impl FromStr for crate::expr::table_function::PbType {
213 type Err = ();
214
215 fn from_str(s: &str) -> Result<Self, Self::Err> {
216 Self::from_str_name(&s.to_uppercase()).ok_or(())
217 }
218}
219
220impl FromStr for crate::expr::agg_call::PbKind {
221 type Err = ();
222
223 fn from_str(s: &str) -> Result<Self, Self::Err> {
224 Self::from_str_name(&s.to_uppercase()).ok_or(())
225 }
226}
227
228impl stream_plan::MaterializeNode {
229 pub fn dist_key_indices(&self) -> Vec<u32> {
230 self.get_table()
231 .unwrap()
232 .distribution_key
233 .iter()
234 .map(|i| *i as u32)
235 .collect()
236 }
237
238 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
239 self.get_table()
240 .unwrap()
241 .columns
242 .iter()
243 .map(|c| c.get_column_desc().unwrap().clone())
244 .collect()
245 }
246}
247
248impl stream_plan::StreamScanNode {
249 pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
251 self.upstream_column_ids
252 .iter()
253 .map(|id| {
254 (self.table_desc.as_ref().unwrap().columns.iter())
255 .find(|c| c.column_id == *id)
256 .unwrap()
257 .clone()
258 })
259 .collect()
260 }
261}
262
263impl stream_plan::SourceBackfillNode {
264 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
265 self.columns
266 .iter()
267 .map(|c| c.column_desc.as_ref().unwrap().clone())
268 .collect()
269 }
270}
271
272impl common::WorkerNode {
274 pub fn compute_node_parallelism(&self) -> usize {
275 assert_eq!(self.r#type(), WorkerType::ComputeNode);
276 self.property
277 .as_ref()
278 .expect("property should be exist")
279 .parallelism as usize
280 }
281
282 fn compactor_node_parallelism(&self) -> usize {
283 assert_eq!(self.r#type(), WorkerType::Compactor);
284 self.property
285 .as_ref()
286 .expect("property should be exist")
287 .parallelism as usize
288 }
289
290 pub fn parallelism(&self) -> Option<usize> {
291 match self.r#type() {
292 WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
293 WorkerType::Compactor => Some(self.compactor_node_parallelism()),
294 _ => None,
295 }
296 }
297
298 pub fn resource_group(&self) -> Option<String> {
299 self.property
300 .as_ref()
301 .and_then(|p| p.resource_group.clone())
302 }
303}
304
305impl stream_plan::SourceNode {
306 pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
307 Some(
308 self.source_inner
309 .as_ref()?
310 .columns
311 .iter()
312 .map(|c| c.get_column_desc().unwrap().clone())
313 .collect(),
314 )
315 }
316}
317
318impl meta::table_fragments::ActorStatus {
319 pub fn worker_id(&self) -> WorkerId {
320 self.location
321 .as_ref()
322 .expect("actor location should be exist")
323 .worker_node_id
324 }
325}
326
327impl common::ActorLocation {
328 pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
329 Some(Self { worker_node_id })
330 }
331}
332
333impl meta::event_log::EventRecovery {
334 pub fn event_type(&self) -> &str {
335 match self.recovery_event.as_ref() {
336 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
337 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
338 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
339 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
340 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
341 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
342 None => "UNKNOWN_RECOVERY_EVENT",
343 }
344 }
345
346 pub fn database_recovery_start(database_id: u32) -> Self {
347 Self {
348 recovery_event: Some(RecoveryEvent::DatabaseStart(
349 event_recovery::DatabaseRecoveryStart { database_id },
350 )),
351 }
352 }
353
354 pub fn database_recovery_failure(database_id: u32) -> Self {
355 Self {
356 recovery_event: Some(RecoveryEvent::DatabaseFailure(
357 event_recovery::DatabaseRecoveryFailure { database_id },
358 )),
359 }
360 }
361
362 pub fn database_recovery_success(database_id: u32) -> Self {
363 Self {
364 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
365 event_recovery::DatabaseRecoverySuccess { database_id },
366 )),
367 }
368 }
369
370 pub fn global_recovery_start(reason: String) -> Self {
371 Self {
372 recovery_event: Some(RecoveryEvent::GlobalStart(
373 event_recovery::GlobalRecoveryStart { reason },
374 )),
375 }
376 }
377
378 pub fn global_recovery_success(
379 reason: String,
380 duration_secs: f32,
381 running_database_ids: Vec<u32>,
382 recovering_database_ids: Vec<u32>,
383 ) -> Self {
384 Self {
385 recovery_event: Some(RecoveryEvent::GlobalSuccess(
386 event_recovery::GlobalRecoverySuccess {
387 reason,
388 duration_secs,
389 running_database_ids,
390 recovering_database_ids,
391 },
392 )),
393 }
394 }
395
396 pub fn global_recovery_failure(reason: String, error: String) -> Self {
397 Self {
398 recovery_event: Some(RecoveryEvent::GlobalFailure(
399 event_recovery::GlobalRecoveryFailure { reason, error },
400 )),
401 }
402 }
403}
404
405impl stream_plan::StreamNode {
406 pub fn find_stream_source(&self) -> Option<SourceId> {
410 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
411 self.node_body.as_ref()
412 && let Some(inner) = &source.source_inner
413 {
414 return Some(inner.source_id);
415 }
416
417 for child in &self.input {
418 if let Some(source) = child.find_stream_source() {
419 return Some(source);
420 }
421 }
422
423 None
424 }
425
426 pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
434 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
435 self.node_body.as_ref()
436 {
437 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
438 self.input[0].node_body.as_ref().unwrap()
439 {
440 return Some((source.upstream_source_id, merge.upstream_fragment_id));
443 } else {
444 unreachable!(
445 "source backfill must have a merge node as its input: {:?}",
446 self
447 );
448 }
449 }
450
451 for child in &self.input {
452 if let Some(source) = child.find_source_backfill() {
453 return Some(source);
454 }
455 }
456
457 None
458 }
459}
460impl stream_plan::Dispatcher {
461 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
462 stream_plan::DispatchStrategy {
463 r#type: self.r#type,
464 dist_key_indices: self.dist_key_indices.clone(),
465 output_mapping: self.output_mapping.clone(),
466 }
467 }
468}
469
470impl stream_plan::DispatchOutputMapping {
471 pub fn identical(len: usize) -> Self {
473 Self {
474 indices: (0..len as u32).collect(),
475 types: Vec::new(),
476 }
477 }
478
479 pub fn simple(indices: Vec<u32>) -> Self {
481 Self {
482 indices,
483 types: Vec::new(),
484 }
485 }
486
487 pub fn into_simple_indices(self) -> Vec<u32> {
489 assert!(
490 self.types.is_empty(),
491 "types must be empty for simple mapping"
492 );
493 self.indices
494 }
495}
496
497impl catalog::StreamSourceInfo {
498 pub fn is_shared(&self) -> bool {
500 self.cdc_source_job
501 }
502}
503
504impl stream_plan::PbStreamScanType {
505 pub fn is_reschedulable(&self, is_online: bool) -> bool {
506 match self {
507 PbStreamScanType::Unspecified => {
508 unreachable!()
509 }
510 PbStreamScanType::UpstreamOnly => false,
512 PbStreamScanType::ArrangementBackfill => true,
513 PbStreamScanType::CrossDbSnapshotBackfill => true,
514 PbStreamScanType::SnapshotBackfill => !is_online,
515 PbStreamScanType::Chain | PbStreamScanType::Rearrange | PbStreamScanType::Backfill => {
516 false
517 }
518 }
519 }
520}
521
522impl catalog::Sink {
523 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
526
527 pub fn unique_identity(&self) -> String {
528 format!("{}", self.id)
530 }
531
532 #[allow(deprecated)]
536 pub fn ignore_delete(&self) -> bool {
537 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
538 }
539}
540
541impl stream_plan::SinkDesc {
542 #[allow(deprecated)]
546 pub fn ignore_delete(&self) -> bool {
547 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
548 }
549}
550
551impl connector_service::SinkParam {
552 #[allow(deprecated)]
556 pub fn ignore_delete(&self) -> bool {
557 self.raw_ignore_delete || self.sink_type() == catalog::SinkType::ForceAppendOnly
558 }
559}
560
561impl catalog::Table {
562 #[expect(deprecated)]
573 pub fn get_clean_watermark_column_indices(&self) -> Vec<u32> {
574 if !self.clean_watermark_indices.is_empty() {
575 self.clean_watermark_indices.clone()
577 } else if let Some(pk_idx) = self
578 .clean_watermark_index_in_pk
579 .or_else(|| (!self.pk.is_empty()).then_some(0))
581 {
582 if let Some(col_order) = self.pk.get(pk_idx as usize) {
585 vec![col_order.column_index]
586 } else {
587 if cfg!(debug_assertions) {
588 panic!("clean_watermark_index_in_pk is out of range: {self:?}");
589 }
590 vec![]
591 }
592 } else {
593 vec![]
594 }
595 }
596
597 pub fn get_clean_watermark_index_in_pk_compat(&self) -> Option<usize> {
610 let clean_watermark_column_indices = self.get_clean_watermark_column_indices();
611
612 let clean_watermark_indices_in_pk: Vec<usize> = clean_watermark_column_indices
614 .iter()
615 .filter_map(|&col_idx| {
616 self.pk
617 .iter()
618 .position(|col_order| col_order.column_index == col_idx)
619 })
620 .collect();
621
622 clean_watermark_indices_in_pk.iter().min().copied()
624 }
625}
626
627impl std::fmt::Debug for meta::SystemParams {
628 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 f.debug_struct("SystemParams").finish_non_exhaustive()
633 }
634}
635
636impl std::fmt::Debug for data::DataType {
639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640 let data::DataType {
641 precision,
642 scale,
643 interval_type,
644 field_type,
645 field_names,
646 field_ids,
647 type_name,
648 is_nullable: _,
650 } = self;
651
652 let type_name = data::data_type::TypeName::try_from(*type_name)
653 .map(|t| t.as_str_name())
654 .unwrap_or("Unknown");
655
656 let mut s = f.debug_struct(type_name);
657 if self.precision != 0 {
658 s.field("precision", precision);
659 }
660 if self.scale != 0 {
661 s.field("scale", scale);
662 }
663 if self.interval_type != 0 {
664 s.field("interval_type", interval_type);
665 }
666 if !self.field_type.is_empty() {
667 s.field("field_type", field_type);
668 }
669 if !self.field_names.is_empty() {
670 s.field("field_names", field_names);
671 }
672 if !self.field_ids.is_empty() {
673 s.field("field_ids", field_ids);
674 }
675 s.finish()
676 }
677}
678
679impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
680 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681 match self {
682 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
683 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
684 }
685 }
686}
687
688impl std::fmt::Debug for plan_common::ColumnDesc {
689 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 let plan_common::ColumnDesc {
692 column_type,
693 column_id,
694 name,
695 description,
696 additional_column_type,
697 additional_column,
698 generated_or_default_column,
699 version,
700 nullable,
701 } = self;
702
703 let mut s = f.debug_struct("ColumnDesc");
704 if let Some(column_type) = column_type {
705 s.field("column_type", column_type);
706 } else {
707 s.field("column_type", &"Unknown");
708 }
709 s.field("column_id", column_id).field("name", name);
710 if let Some(description) = description {
711 s.field("description", description);
712 }
713 if self.additional_column_type != 0 {
714 s.field("additional_column_type", additional_column_type);
715 }
716 s.field("version", version);
717 if let Some(AdditionalColumn {
718 column_type: Some(column_type),
719 }) = additional_column
720 {
721 s.field("additional_column", &column_type);
723 }
724 if let Some(generated_or_default_column) = generated_or_default_column {
725 s.field("generated_or_default_column", &generated_or_default_column);
726 }
727 s.field("nullable", nullable);
728 s.finish()
729 }
730}
731
732impl expr::UserDefinedFunction {
733 pub fn name_in_runtime(&self) -> Option<&str> {
734 if self.version() < expr::UdfExprVersion::NameInRuntime {
735 if self.language == "rust" || self.language == "wasm" {
736 Some(&self.name)
740 } else {
741 self.identifier.as_deref()
743 }
744 } else {
745 self.identifier.as_deref()
747 }
748 }
749}
750
751impl expr::UserDefinedFunctionMetadata {
752 pub fn name_in_runtime(&self) -> Option<&str> {
753 if self.version() < expr::UdfExprVersion::NameInRuntime {
754 if self.language == "rust" || self.language == "wasm" {
755 let old_identifier = self
760 .identifier
761 .as_ref()
762 .expect("Rust/WASM UDF must have identifier");
763 Some(
764 old_identifier
765 .split_once("(")
766 .expect("the old identifier must contain `(`")
767 .0,
768 )
769 } else {
770 self.identifier.as_deref()
772 }
773 } else {
774 self.identifier.as_deref()
776 }
777 }
778}
779
780impl streaming_job_resource_type::ResourceType {
781 pub fn resource_group(&self) -> Option<String> {
782 match self {
783 streaming_job_resource_type::ResourceType::Regular(_) => None,
784 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
785 | streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group) => {
786 Some(group.clone())
787 }
788 }
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use crate::data::{DataType, data_type};
795 use crate::plan_common::Field;
796 use crate::stream_plan::stream_node::NodeBody;
797
798 #[test]
799 fn test_getter() {
800 let data_type: DataType = DataType {
801 is_nullable: true,
802 ..Default::default()
803 };
804 let field = Field {
805 data_type: Some(data_type),
806 name: "".to_owned(),
807 };
808 assert!(field.get_data_type().unwrap().is_nullable);
809 }
810
811 #[test]
812 fn test_enum_getter() {
813 let mut data_type: DataType = DataType::default();
814 data_type.type_name = data_type::TypeName::Double as i32;
815 assert_eq!(
816 data_type::TypeName::Double,
817 data_type.get_type_name().unwrap()
818 );
819 }
820
821 #[test]
822 fn test_enum_unspecified() {
823 let mut data_type: DataType = DataType::default();
824 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
825 assert!(data_type.get_type_name().is_err());
826 }
827
828 #[test]
829 fn test_primitive_getter() {
830 let data_type: DataType = DataType::default();
831 let new_data_type = DataType {
832 is_nullable: data_type.get_is_nullable(),
833 ..Default::default()
834 };
835 assert!(!new_data_type.is_nullable);
836 }
837
838 #[test]
839 fn test_size() {
840 use static_assertions::const_assert_eq;
841 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
844 }
845
846 #[test]
847 #[expect(deprecated)]
848 fn test_get_clean_watermark_index_in_pk_compat() {
849 use crate::catalog::Table;
850 use crate::common::{ColumnOrder, OrderType};
851
852 fn create_column_order(column_index: u32) -> ColumnOrder {
853 ColumnOrder {
854 column_index,
855 order_type: Some(OrderType::default()),
856 }
857 }
858
859 let table = Table {
861 clean_watermark_indices: vec![3, 2],
862 clean_watermark_index_in_pk: Some(0),
863 pk: vec![
864 create_column_order(1),
865 create_column_order(2),
866 create_column_order(3),
867 create_column_order(4),
868 ],
869 ..Default::default()
870 };
871 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
872
873 let table = Table {
875 clean_watermark_indices: vec![],
876 clean_watermark_index_in_pk: Some(1),
877 pk: vec![create_column_order(0), create_column_order(2)],
878 ..Default::default()
879 };
880 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), Some(1));
881
882 let table = Table {
884 clean_watermark_indices: vec![],
885 clean_watermark_index_in_pk: None,
886 ..Default::default()
887 };
888 assert_eq!(table.get_clean_watermark_index_in_pk_compat(), None);
889 }
890}