1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25#![expect(clippy::large_enum_variant)]
27
28use std::str::FromStr;
29
30use event_recovery::RecoveryEvent;
31use plan_common::AdditionalColumn;
32pub use prost::Message;
33use risingwave_error::tonic::ToTonicStatus;
34use thiserror::Error;
35
36use crate::common::WorkerType;
37use crate::meta::event_log::event_recovery;
38use crate::stream_plan::PbStreamScanType;
39
40#[rustfmt::skip]
41#[cfg_attr(madsim, path = "sim/catalog.rs")]
42pub mod catalog;
43#[rustfmt::skip]
44#[cfg_attr(madsim, path = "sim/common.rs")]
45pub mod common;
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/compute.rs")]
48pub mod compute;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
51pub mod cloud_service;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/data.rs")]
54pub mod data;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
57pub mod ddl_service;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/expr.rs")]
60pub mod expr;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/meta.rs")]
63pub mod meta;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/plan_common.rs")]
66pub mod plan_common;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
69pub mod batch_plan;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/task_service.rs")]
72pub mod task_service;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/connector_service.rs")]
75pub mod connector_service;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
78pub mod stream_plan;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/stream_service.rs")]
81pub mod stream_service;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/hummock.rs")]
84pub mod hummock;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/compactor.rs")]
87pub mod compactor;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/user.rs")]
90pub mod user;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/source.rs")]
93pub mod source;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
96pub mod monitor_service;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/backup_service.rs")]
99pub mod backup_service;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
102pub mod serverless_backfill_controller;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
105pub mod frontend_service;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/java_binding.rs")]
108pub mod java_binding;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/health.rs")]
111pub mod health;
112#[rustfmt::skip]
113#[path = "sim/telemetry.rs"]
114pub mod telemetry;
115#[rustfmt::skip]
116#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
117pub mod iceberg_compaction;
118
119#[rustfmt::skip]
120#[path = "sim/secret.rs"]
121pub mod secret;
122#[rustfmt::skip]
123#[path = "connector_service.serde.rs"]
124pub mod connector_service_serde;
125#[rustfmt::skip]
126#[path = "catalog.serde.rs"]
127pub mod catalog_serde;
128#[rustfmt::skip]
129#[path = "common.serde.rs"]
130pub mod common_serde;
131#[rustfmt::skip]
132#[path = "compute.serde.rs"]
133pub mod compute_serde;
134#[rustfmt::skip]
135#[path = "cloud_service.serde.rs"]
136pub mod cloud_service_serde;
137#[rustfmt::skip]
138#[path = "data.serde.rs"]
139pub mod data_serde;
140#[rustfmt::skip]
141#[path = "ddl_service.serde.rs"]
142pub mod ddl_service_serde;
143#[rustfmt::skip]
144#[path = "expr.serde.rs"]
145pub mod expr_serde;
146#[rustfmt::skip]
147#[path = "meta.serde.rs"]
148pub mod meta_serde;
149#[rustfmt::skip]
150#[path = "plan_common.serde.rs"]
151pub mod plan_common_serde;
152#[rustfmt::skip]
153#[path = "batch_plan.serde.rs"]
154pub mod batch_plan_serde;
155#[rustfmt::skip]
156#[path = "task_service.serde.rs"]
157pub mod task_service_serde;
158#[rustfmt::skip]
159#[path = "stream_plan.serde.rs"]
160pub mod stream_plan_serde;
161#[rustfmt::skip]
162#[path = "stream_service.serde.rs"]
163pub mod stream_service_serde;
164#[rustfmt::skip]
165#[path = "hummock.serde.rs"]
166pub mod hummock_serde;
167#[rustfmt::skip]
168#[path = "compactor.serde.rs"]
169pub mod compactor_serde;
170#[rustfmt::skip]
171#[path = "user.serde.rs"]
172pub mod user_serde;
173#[rustfmt::skip]
174#[path = "source.serde.rs"]
175pub mod source_serde;
176#[rustfmt::skip]
177#[path = "monitor_service.serde.rs"]
178pub mod monitor_service_serde;
179#[rustfmt::skip]
180#[path = "backup_service.serde.rs"]
181pub mod backup_service_serde;
182#[rustfmt::skip]
183#[path = "java_binding.serde.rs"]
184pub mod java_binding_serde;
185#[rustfmt::skip]
186#[path = "telemetry.serde.rs"]
187pub mod telemetry_serde;
188
189#[rustfmt::skip]
190#[path = "secret.serde.rs"]
191pub mod secret_serde;
192#[rustfmt::skip]
193#[path = "serverless_backfill_controller.serde.rs"]
194pub mod serverless_backfill_controller_serde;
195
196#[derive(Clone, PartialEq, Eq, Debug, Error)]
197#[error("field `{0}` not found")]
198pub struct PbFieldNotFound(pub &'static str);
199
200impl From<PbFieldNotFound> for tonic::Status {
201 fn from(e: PbFieldNotFound) -> Self {
202 e.to_status_unnamed(tonic::Code::Internal)
203 }
204}
205
206impl FromStr for crate::expr::table_function::PbType {
207 type Err = ();
208
209 fn from_str(s: &str) -> Result<Self, Self::Err> {
210 Self::from_str_name(&s.to_uppercase()).ok_or(())
211 }
212}
213
214impl FromStr for crate::expr::agg_call::PbKind {
215 type Err = ();
216
217 fn from_str(s: &str) -> Result<Self, Self::Err> {
218 Self::from_str_name(&s.to_uppercase()).ok_or(())
219 }
220}
221
222impl stream_plan::MaterializeNode {
223 pub fn dist_key_indices(&self) -> Vec<u32> {
224 self.get_table()
225 .unwrap()
226 .distribution_key
227 .iter()
228 .map(|i| *i as u32)
229 .collect()
230 }
231
232 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
233 self.get_table()
234 .unwrap()
235 .columns
236 .iter()
237 .map(|c| c.get_column_desc().unwrap().clone())
238 .collect()
239 }
240}
241
242impl stream_plan::StreamScanNode {
243 pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
245 self.upstream_column_ids
246 .iter()
247 .map(|id| {
248 (self.table_desc.as_ref().unwrap().columns.iter())
249 .find(|c| c.column_id == *id)
250 .unwrap()
251 .clone()
252 })
253 .collect()
254 }
255}
256
257impl stream_plan::SourceBackfillNode {
258 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
259 self.columns
260 .iter()
261 .map(|c| c.column_desc.as_ref().unwrap().clone())
262 .collect()
263 }
264}
265
266impl common::WorkerNode {
268 pub fn compute_node_parallelism(&self) -> usize {
269 assert_eq!(self.r#type(), WorkerType::ComputeNode);
270 self.property
271 .as_ref()
272 .expect("property should be exist")
273 .parallelism as usize
274 }
275
276 pub fn parallelism(&self) -> Option<usize> {
277 if WorkerType::ComputeNode == self.r#type() {
278 Some(self.compute_node_parallelism())
279 } else {
280 None
281 }
282 }
283
284 pub fn resource_group(&self) -> Option<String> {
285 self.property
286 .as_ref()
287 .and_then(|p| p.resource_group.clone())
288 }
289}
290
291impl stream_plan::SourceNode {
292 pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
293 Some(
294 self.source_inner
295 .as_ref()?
296 .columns
297 .iter()
298 .map(|c| c.get_column_desc().unwrap().clone())
299 .collect(),
300 )
301 }
302}
303
304impl meta::table_fragments::ActorStatus {
305 pub fn worker_id(&self) -> u32 {
306 self.location
307 .as_ref()
308 .expect("actor location should be exist")
309 .worker_node_id
310 }
311}
312
313impl common::WorkerNode {
314 pub fn is_streaming_schedulable(&self) -> bool {
315 let property = self.property.as_ref();
316 property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
317 }
318}
319
320impl common::ActorLocation {
321 pub fn from_worker(worker_node_id: u32) -> Option<Self> {
322 Some(Self { worker_node_id })
323 }
324}
325
326impl meta::event_log::EventRecovery {
327 pub fn event_type(&self) -> &str {
328 match self.recovery_event.as_ref() {
329 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
330 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
331 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
332 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
333 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
334 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
335 None => "UNKNOWN_RECOVERY_EVENT",
336 }
337 }
338
339 pub fn database_recovery_start(database_id: u32) -> Self {
340 Self {
341 recovery_event: Some(RecoveryEvent::DatabaseStart(
342 event_recovery::DatabaseRecoveryStart { database_id },
343 )),
344 }
345 }
346
347 pub fn database_recovery_failure(database_id: u32) -> Self {
348 Self {
349 recovery_event: Some(RecoveryEvent::DatabaseFailure(
350 event_recovery::DatabaseRecoveryFailure { database_id },
351 )),
352 }
353 }
354
355 pub fn database_recovery_success(database_id: u32) -> Self {
356 Self {
357 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
358 event_recovery::DatabaseRecoverySuccess { database_id },
359 )),
360 }
361 }
362
363 pub fn global_recovery_start(reason: String) -> Self {
364 Self {
365 recovery_event: Some(RecoveryEvent::GlobalStart(
366 event_recovery::GlobalRecoveryStart { reason },
367 )),
368 }
369 }
370
371 pub fn global_recovery_success(
372 reason: String,
373 duration_secs: f32,
374 running_database_ids: Vec<u32>,
375 recovering_database_ids: Vec<u32>,
376 ) -> Self {
377 Self {
378 recovery_event: Some(RecoveryEvent::GlobalSuccess(
379 event_recovery::GlobalRecoverySuccess {
380 reason,
381 duration_secs,
382 running_database_ids,
383 recovering_database_ids,
384 },
385 )),
386 }
387 }
388
389 pub fn global_recovery_failure(reason: String, error: String) -> Self {
390 Self {
391 recovery_event: Some(RecoveryEvent::GlobalFailure(
392 event_recovery::GlobalRecoveryFailure { reason, error },
393 )),
394 }
395 }
396}
397
398impl stream_plan::StreamNode {
399 pub fn find_stream_source(&self) -> Option<u32> {
403 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
404 self.node_body.as_ref()
405 {
406 if let Some(inner) = &source.source_inner {
407 return Some(inner.source_id);
408 }
409 }
410
411 for child in &self.input {
412 if let Some(source) = child.find_stream_source() {
413 return Some(source);
414 }
415 }
416
417 None
418 }
419
420 pub fn find_source_backfill(&self) -> Option<(u32, u32)> {
428 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
429 self.node_body.as_ref()
430 {
431 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
432 self.input[0].node_body.as_ref().unwrap()
433 {
434 return Some((source.upstream_source_id, merge.upstream_fragment_id));
437 } else {
438 unreachable!(
439 "source backfill must have a merge node as its input: {:?}",
440 self
441 );
442 }
443 }
444
445 for child in &self.input {
446 if let Some(source) = child.find_source_backfill() {
447 return Some(source);
448 }
449 }
450
451 None
452 }
453}
454
455impl stream_plan::FragmentTypeFlag {
456 pub fn backfill_rate_limit_fragments() -> i32 {
458 stream_plan::FragmentTypeFlag::SourceScan as i32
459 | stream_plan::FragmentTypeFlag::StreamScan as i32
460 }
461
462 pub fn source_rate_limit_fragments() -> i32 {
465 stream_plan::FragmentTypeFlag::Source as i32 | stream_plan::FragmentTypeFlag::FsFetch as i32
466 }
467
468 pub fn sink_rate_limit_fragments() -> i32 {
470 stream_plan::FragmentTypeFlag::Sink as i32
471 }
472
473 pub fn rate_limit_fragments() -> i32 {
475 Self::backfill_rate_limit_fragments()
476 | Self::source_rate_limit_fragments()
477 | Self::sink_rate_limit_fragments()
478 }
479
480 pub fn dml_rate_limit_fragments() -> i32 {
481 stream_plan::FragmentTypeFlag::Dml as i32
482 }
483}
484
485impl stream_plan::Dispatcher {
486 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
487 stream_plan::DispatchStrategy {
488 r#type: self.r#type,
489 dist_key_indices: self.dist_key_indices.clone(),
490 output_mapping: self.output_mapping.clone(),
491 }
492 }
493}
494
495impl stream_plan::DispatchOutputMapping {
496 pub fn identical(len: usize) -> Self {
498 Self {
499 indices: (0..len as u32).collect(),
500 types: Vec::new(),
501 }
502 }
503
504 pub fn simple(indices: Vec<u32>) -> Self {
506 Self {
507 indices,
508 types: Vec::new(),
509 }
510 }
511
512 pub fn into_simple_indices(self) -> Vec<u32> {
514 assert!(
515 self.types.is_empty(),
516 "types must be empty for simple mapping"
517 );
518 self.indices
519 }
520}
521
522impl catalog::StreamSourceInfo {
523 pub fn is_shared(&self) -> bool {
525 self.cdc_source_job
526 }
527}
528
529impl stream_plan::PbStreamScanType {
530 pub fn is_reschedulable(&self) -> bool {
531 match self {
532 PbStreamScanType::UpstreamOnly => false,
534 PbStreamScanType::ArrangementBackfill => true,
535 PbStreamScanType::CrossDbSnapshotBackfill => true,
536 PbStreamScanType::SnapshotBackfill => true,
537 _ => false,
538 }
539 }
540}
541
542impl catalog::Sink {
543 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
546
547 pub fn unique_identity(&self) -> String {
548 format!("{}", self.id)
550 }
551}
552
553impl std::fmt::Debug for meta::SystemParams {
554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558 f.debug_struct("SystemParams").finish_non_exhaustive()
559 }
560}
561
562impl std::fmt::Debug for data::DataType {
565 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566 let data::DataType {
567 precision,
568 scale,
569 interval_type,
570 field_type,
571 field_names,
572 field_ids,
573 type_name,
574 is_nullable: _,
576 } = self;
577
578 let type_name = data::data_type::TypeName::try_from(*type_name)
579 .map(|t| t.as_str_name())
580 .unwrap_or("Unknown");
581
582 let mut s = f.debug_struct(type_name);
583 if self.precision != 0 {
584 s.field("precision", precision);
585 }
586 if self.scale != 0 {
587 s.field("scale", scale);
588 }
589 if self.interval_type != 0 {
590 s.field("interval_type", interval_type);
591 }
592 if !self.field_type.is_empty() {
593 s.field("field_type", field_type);
594 }
595 if !self.field_names.is_empty() {
596 s.field("field_names", field_names);
597 }
598 if !self.field_ids.is_empty() {
599 s.field("field_ids", field_ids);
600 }
601 s.finish()
602 }
603}
604
605impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607 match self {
608 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
609 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
610 }
611 }
612}
613
614impl std::fmt::Debug for plan_common::ColumnDesc {
615 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616 let plan_common::ColumnDesc {
618 column_type,
619 column_id,
620 name,
621 description,
622 additional_column_type,
623 additional_column,
624 generated_or_default_column,
625 version,
626 nullable,
627 } = self;
628
629 let mut s = f.debug_struct("ColumnDesc");
630 if let Some(column_type) = column_type {
631 s.field("column_type", column_type);
632 } else {
633 s.field("column_type", &"Unknown");
634 }
635 s.field("column_id", column_id).field("name", name);
636 if let Some(description) = description {
637 s.field("description", description);
638 }
639 if self.additional_column_type != 0 {
640 s.field("additional_column_type", additional_column_type);
641 }
642 s.field("version", version);
643 if let Some(AdditionalColumn {
644 column_type: Some(column_type),
645 }) = additional_column
646 {
647 s.field("additional_column", &column_type);
649 }
650 if let Some(generated_or_default_column) = generated_or_default_column {
651 s.field("generated_or_default_column", &generated_or_default_column);
652 }
653 s.field("nullable", nullable);
654 s.finish()
655 }
656}
657
658impl expr::UserDefinedFunction {
659 pub fn name_in_runtime(&self) -> Option<&str> {
660 if self.version() < expr::UdfExprVersion::NameInRuntime {
661 if self.language == "rust" || self.language == "wasm" {
662 Some(&self.name)
666 } else {
667 self.identifier.as_deref()
669 }
670 } else {
671 self.identifier.as_deref()
673 }
674 }
675}
676
677impl expr::UserDefinedFunctionMetadata {
678 pub fn name_in_runtime(&self) -> Option<&str> {
679 if self.version() < expr::UdfExprVersion::NameInRuntime {
680 if self.language == "rust" || self.language == "wasm" {
681 let old_identifier = self
686 .identifier
687 .as_ref()
688 .expect("Rust/WASM UDF must have identifier");
689 Some(
690 old_identifier
691 .split_once("(")
692 .expect("the old identifier must contain `(`")
693 .0,
694 )
695 } else {
696 self.identifier.as_deref()
698 }
699 } else {
700 self.identifier.as_deref()
702 }
703 }
704}
705
706#[cfg(test)]
707mod tests {
708 use crate::data::{DataType, data_type};
709 use crate::plan_common::Field;
710 use crate::stream_plan::stream_node::NodeBody;
711
712 #[test]
713 fn test_getter() {
714 let data_type: DataType = DataType {
715 is_nullable: true,
716 ..Default::default()
717 };
718 let field = Field {
719 data_type: Some(data_type),
720 name: "".to_owned(),
721 };
722 assert!(field.get_data_type().unwrap().is_nullable);
723 }
724
725 #[test]
726 fn test_enum_getter() {
727 let mut data_type: DataType = DataType::default();
728 data_type.type_name = data_type::TypeName::Double as i32;
729 assert_eq!(
730 data_type::TypeName::Double,
731 data_type.get_type_name().unwrap()
732 );
733 }
734
735 #[test]
736 fn test_enum_unspecified() {
737 let mut data_type: DataType = DataType::default();
738 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
739 assert!(data_type.get_type_name().is_err());
740 }
741
742 #[test]
743 fn test_primitive_getter() {
744 let data_type: DataType = DataType::default();
745 let new_data_type = DataType {
746 is_nullable: data_type.get_is_nullable(),
747 ..Default::default()
748 };
749 assert!(!new_data_type.is_nullable);
750 }
751
752 #[test]
753 fn test_size() {
754 use static_assertions::const_assert_eq;
755 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
758 }
759}