1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25#![expect(clippy::large_enum_variant)]
27
28pub mod id;
29
30use std::str::FromStr;
31
32use event_recovery::RecoveryEvent;
33use plan_common::AdditionalColumn;
34pub use prost::Message;
35use risingwave_error::tonic::ToTonicStatus;
36use thiserror::Error;
37
38use crate::common::WorkerType;
39use crate::id::{FragmentId, SourceId, WorkerId};
40use crate::meta::event_log::event_recovery;
41use crate::stream_plan::PbStreamScanType;
42
43#[rustfmt::skip]
44#[cfg_attr(madsim, path = "sim/catalog.rs")]
45pub mod catalog;
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/common.rs")]
48pub mod common;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/compute.rs")]
51pub mod compute;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
54pub mod cloud_service;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/data.rs")]
57pub mod data;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
60pub mod ddl_service;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/expr.rs")]
63pub mod expr;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/meta.rs")]
66pub mod meta;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/plan_common.rs")]
69pub mod plan_common;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
72pub mod batch_plan;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/task_service.rs")]
75pub mod task_service;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/connector_service.rs")]
78pub mod connector_service;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
81pub mod stream_plan;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/stream_service.rs")]
84pub mod stream_service;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/hummock.rs")]
87pub mod hummock;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/compactor.rs")]
90pub mod compactor;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/user.rs")]
93pub mod user;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/source.rs")]
96pub mod source;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
99pub mod monitor_service;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/backup_service.rs")]
102pub mod backup_service;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
105pub mod serverless_backfill_controller;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
108pub mod frontend_service;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/java_binding.rs")]
111pub mod java_binding;
112#[rustfmt::skip]
113#[cfg_attr(madsim, path = "sim/health.rs")]
114pub mod health;
115#[rustfmt::skip]
116#[path = "sim/telemetry.rs"]
117pub mod telemetry;
118#[rustfmt::skip]
119#[cfg_attr(madsim, path = "sim/iceberg_compaction.rs")]
120pub mod iceberg_compaction;
121
122#[rustfmt::skip]
123#[path = "sim/secret.rs"]
124pub mod secret;
125#[rustfmt::skip]
126#[path = "connector_service.serde.rs"]
127pub mod connector_service_serde;
128#[rustfmt::skip]
129#[path = "catalog.serde.rs"]
130pub mod catalog_serde;
131#[rustfmt::skip]
132#[path = "common.serde.rs"]
133pub mod common_serde;
134#[rustfmt::skip]
135#[path = "compute.serde.rs"]
136pub mod compute_serde;
137#[rustfmt::skip]
138#[path = "cloud_service.serde.rs"]
139pub mod cloud_service_serde;
140#[rustfmt::skip]
141#[path = "data.serde.rs"]
142pub mod data_serde;
143#[rustfmt::skip]
144#[path = "ddl_service.serde.rs"]
145pub mod ddl_service_serde;
146#[rustfmt::skip]
147#[path = "expr.serde.rs"]
148pub mod expr_serde;
149#[rustfmt::skip]
150#[path = "meta.serde.rs"]
151pub mod meta_serde;
152#[rustfmt::skip]
153#[path = "plan_common.serde.rs"]
154pub mod plan_common_serde;
155#[rustfmt::skip]
156#[path = "batch_plan.serde.rs"]
157pub mod batch_plan_serde;
158#[rustfmt::skip]
159#[path = "task_service.serde.rs"]
160pub mod task_service_serde;
161#[rustfmt::skip]
162#[path = "stream_plan.serde.rs"]
163pub mod stream_plan_serde;
164#[rustfmt::skip]
165#[path = "stream_service.serde.rs"]
166pub mod stream_service_serde;
167#[rustfmt::skip]
168#[path = "hummock.serde.rs"]
169pub mod hummock_serde;
170#[rustfmt::skip]
171#[path = "compactor.serde.rs"]
172pub mod compactor_serde;
173#[rustfmt::skip]
174#[path = "user.serde.rs"]
175pub mod user_serde;
176#[rustfmt::skip]
177#[path = "source.serde.rs"]
178pub mod source_serde;
179#[rustfmt::skip]
180#[path = "monitor_service.serde.rs"]
181pub mod monitor_service_serde;
182#[rustfmt::skip]
183#[path = "backup_service.serde.rs"]
184pub mod backup_service_serde;
185#[rustfmt::skip]
186#[path = "java_binding.serde.rs"]
187pub mod java_binding_serde;
188#[rustfmt::skip]
189#[path = "telemetry.serde.rs"]
190pub mod telemetry_serde;
191
192#[rustfmt::skip]
193#[path = "secret.serde.rs"]
194pub mod secret_serde;
195#[rustfmt::skip]
196#[path = "serverless_backfill_controller.serde.rs"]
197pub mod serverless_backfill_controller_serde;
198
199#[derive(Clone, PartialEq, Eq, Debug, Error)]
200#[error("field `{0}` not found")]
201pub struct PbFieldNotFound(pub &'static str);
202
203impl From<PbFieldNotFound> for tonic::Status {
204 fn from(e: PbFieldNotFound) -> Self {
205 e.to_status_unnamed(tonic::Code::Internal)
206 }
207}
208
209impl FromStr for crate::expr::table_function::PbType {
210 type Err = ();
211
212 fn from_str(s: &str) -> Result<Self, Self::Err> {
213 Self::from_str_name(&s.to_uppercase()).ok_or(())
214 }
215}
216
217impl FromStr for crate::expr::agg_call::PbKind {
218 type Err = ();
219
220 fn from_str(s: &str) -> Result<Self, Self::Err> {
221 Self::from_str_name(&s.to_uppercase()).ok_or(())
222 }
223}
224
225impl stream_plan::MaterializeNode {
226 pub fn dist_key_indices(&self) -> Vec<u32> {
227 self.get_table()
228 .unwrap()
229 .distribution_key
230 .iter()
231 .map(|i| *i as u32)
232 .collect()
233 }
234
235 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
236 self.get_table()
237 .unwrap()
238 .columns
239 .iter()
240 .map(|c| c.get_column_desc().unwrap().clone())
241 .collect()
242 }
243}
244
245impl stream_plan::StreamScanNode {
246 pub fn upstream_columns(&self) -> Vec<plan_common::PbColumnDesc> {
248 self.upstream_column_ids
249 .iter()
250 .map(|id| {
251 (self.table_desc.as_ref().unwrap().columns.iter())
252 .find(|c| c.column_id == *id)
253 .unwrap()
254 .clone()
255 })
256 .collect()
257 }
258}
259
260impl stream_plan::SourceBackfillNode {
261 pub fn column_descs(&self) -> Vec<plan_common::PbColumnDesc> {
262 self.columns
263 .iter()
264 .map(|c| c.column_desc.as_ref().unwrap().clone())
265 .collect()
266 }
267}
268
269impl common::WorkerNode {
271 pub fn compute_node_parallelism(&self) -> usize {
272 assert_eq!(self.r#type(), WorkerType::ComputeNode);
273 self.property
274 .as_ref()
275 .expect("property should be exist")
276 .parallelism as usize
277 }
278
279 fn compactor_node_parallelism(&self) -> usize {
280 assert_eq!(self.r#type(), WorkerType::Compactor);
281 self.property
282 .as_ref()
283 .expect("property should be exist")
284 .parallelism as usize
285 }
286
287 pub fn parallelism(&self) -> Option<usize> {
288 match self.r#type() {
289 WorkerType::ComputeNode => Some(self.compute_node_parallelism()),
290 WorkerType::Compactor => Some(self.compactor_node_parallelism()),
291 _ => None,
292 }
293 }
294
295 pub fn resource_group(&self) -> Option<String> {
296 self.property
297 .as_ref()
298 .and_then(|p| p.resource_group.clone())
299 }
300}
301
302impl stream_plan::SourceNode {
303 pub fn column_descs(&self) -> Option<Vec<plan_common::PbColumnDesc>> {
304 Some(
305 self.source_inner
306 .as_ref()?
307 .columns
308 .iter()
309 .map(|c| c.get_column_desc().unwrap().clone())
310 .collect(),
311 )
312 }
313}
314
315impl meta::table_fragments::ActorStatus {
316 pub fn worker_id(&self) -> WorkerId {
317 self.location
318 .as_ref()
319 .expect("actor location should be exist")
320 .worker_node_id
321 }
322}
323
324impl common::WorkerNode {
325 pub fn is_streaming_schedulable(&self) -> bool {
326 let property = self.property.as_ref();
327 property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
328 }
329}
330
331impl common::ActorLocation {
332 pub fn from_worker(worker_node_id: WorkerId) -> Option<Self> {
333 Some(Self { worker_node_id })
334 }
335}
336
337impl meta::event_log::EventRecovery {
338 pub fn event_type(&self) -> &str {
339 match self.recovery_event.as_ref() {
340 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
341 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
342 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
343 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
344 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
345 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
346 None => "UNKNOWN_RECOVERY_EVENT",
347 }
348 }
349
350 pub fn database_recovery_start(database_id: u32) -> Self {
351 Self {
352 recovery_event: Some(RecoveryEvent::DatabaseStart(
353 event_recovery::DatabaseRecoveryStart { database_id },
354 )),
355 }
356 }
357
358 pub fn database_recovery_failure(database_id: u32) -> Self {
359 Self {
360 recovery_event: Some(RecoveryEvent::DatabaseFailure(
361 event_recovery::DatabaseRecoveryFailure { database_id },
362 )),
363 }
364 }
365
366 pub fn database_recovery_success(database_id: u32) -> Self {
367 Self {
368 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
369 event_recovery::DatabaseRecoverySuccess { database_id },
370 )),
371 }
372 }
373
374 pub fn global_recovery_start(reason: String) -> Self {
375 Self {
376 recovery_event: Some(RecoveryEvent::GlobalStart(
377 event_recovery::GlobalRecoveryStart { reason },
378 )),
379 }
380 }
381
382 pub fn global_recovery_success(
383 reason: String,
384 duration_secs: f32,
385 running_database_ids: Vec<u32>,
386 recovering_database_ids: Vec<u32>,
387 ) -> Self {
388 Self {
389 recovery_event: Some(RecoveryEvent::GlobalSuccess(
390 event_recovery::GlobalRecoverySuccess {
391 reason,
392 duration_secs,
393 running_database_ids,
394 recovering_database_ids,
395 },
396 )),
397 }
398 }
399
400 pub fn global_recovery_failure(reason: String, error: String) -> Self {
401 Self {
402 recovery_event: Some(RecoveryEvent::GlobalFailure(
403 event_recovery::GlobalRecoveryFailure { reason, error },
404 )),
405 }
406 }
407}
408
409impl stream_plan::StreamNode {
410 pub fn find_stream_source(&self) -> Option<SourceId> {
414 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
415 self.node_body.as_ref()
416 && let Some(inner) = &source.source_inner
417 {
418 return Some(inner.source_id);
419 }
420
421 for child in &self.input {
422 if let Some(source) = child.find_stream_source() {
423 return Some(source);
424 }
425 }
426
427 None
428 }
429
430 pub fn find_source_backfill(&self) -> Option<(SourceId, FragmentId)> {
438 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
439 self.node_body.as_ref()
440 {
441 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
442 self.input[0].node_body.as_ref().unwrap()
443 {
444 return Some((source.upstream_source_id, merge.upstream_fragment_id));
447 } else {
448 unreachable!(
449 "source backfill must have a merge node as its input: {:?}",
450 self
451 );
452 }
453 }
454
455 for child in &self.input {
456 if let Some(source) = child.find_source_backfill() {
457 return Some(source);
458 }
459 }
460
461 None
462 }
463}
464impl stream_plan::Dispatcher {
465 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
466 stream_plan::DispatchStrategy {
467 r#type: self.r#type,
468 dist_key_indices: self.dist_key_indices.clone(),
469 output_mapping: self.output_mapping.clone(),
470 }
471 }
472}
473
474impl stream_plan::DispatchOutputMapping {
475 pub fn identical(len: usize) -> Self {
477 Self {
478 indices: (0..len as u32).collect(),
479 types: Vec::new(),
480 }
481 }
482
483 pub fn simple(indices: Vec<u32>) -> Self {
485 Self {
486 indices,
487 types: Vec::new(),
488 }
489 }
490
491 pub fn into_simple_indices(self) -> Vec<u32> {
493 assert!(
494 self.types.is_empty(),
495 "types must be empty for simple mapping"
496 );
497 self.indices
498 }
499}
500
501impl catalog::StreamSourceInfo {
502 pub fn is_shared(&self) -> bool {
504 self.cdc_source_job
505 }
506}
507
508impl stream_plan::PbStreamScanType {
509 pub fn is_reschedulable(&self) -> bool {
510 match self {
511 PbStreamScanType::UpstreamOnly => false,
513 PbStreamScanType::ArrangementBackfill => true,
514 PbStreamScanType::CrossDbSnapshotBackfill => true,
515 PbStreamScanType::SnapshotBackfill => true,
516 _ => false,
517 }
518 }
519}
520
521impl catalog::Sink {
522 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
525
526 pub fn unique_identity(&self) -> String {
527 format!("{}", self.id)
529 }
530}
531
532impl std::fmt::Debug for meta::SystemParams {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537 f.debug_struct("SystemParams").finish_non_exhaustive()
538 }
539}
540
541impl std::fmt::Debug for data::DataType {
544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545 let data::DataType {
546 precision,
547 scale,
548 interval_type,
549 field_type,
550 field_names,
551 field_ids,
552 type_name,
553 is_nullable: _,
555 } = self;
556
557 let type_name = data::data_type::TypeName::try_from(*type_name)
558 .map(|t| t.as_str_name())
559 .unwrap_or("Unknown");
560
561 let mut s = f.debug_struct(type_name);
562 if self.precision != 0 {
563 s.field("precision", precision);
564 }
565 if self.scale != 0 {
566 s.field("scale", scale);
567 }
568 if self.interval_type != 0 {
569 s.field("interval_type", interval_type);
570 }
571 if !self.field_type.is_empty() {
572 s.field("field_type", field_type);
573 }
574 if !self.field_names.is_empty() {
575 s.field("field_names", field_names);
576 }
577 if !self.field_ids.is_empty() {
578 s.field("field_ids", field_ids);
579 }
580 s.finish()
581 }
582}
583
584impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586 match self {
587 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
588 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
589 }
590 }
591}
592
593impl std::fmt::Debug for plan_common::ColumnDesc {
594 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
595 let plan_common::ColumnDesc {
597 column_type,
598 column_id,
599 name,
600 description,
601 additional_column_type,
602 additional_column,
603 generated_or_default_column,
604 version,
605 nullable,
606 } = self;
607
608 let mut s = f.debug_struct("ColumnDesc");
609 if let Some(column_type) = column_type {
610 s.field("column_type", column_type);
611 } else {
612 s.field("column_type", &"Unknown");
613 }
614 s.field("column_id", column_id).field("name", name);
615 if let Some(description) = description {
616 s.field("description", description);
617 }
618 if self.additional_column_type != 0 {
619 s.field("additional_column_type", additional_column_type);
620 }
621 s.field("version", version);
622 if let Some(AdditionalColumn {
623 column_type: Some(column_type),
624 }) = additional_column
625 {
626 s.field("additional_column", &column_type);
628 }
629 if let Some(generated_or_default_column) = generated_or_default_column {
630 s.field("generated_or_default_column", &generated_or_default_column);
631 }
632 s.field("nullable", nullable);
633 s.finish()
634 }
635}
636
637impl expr::UserDefinedFunction {
638 pub fn name_in_runtime(&self) -> Option<&str> {
639 if self.version() < expr::UdfExprVersion::NameInRuntime {
640 if self.language == "rust" || self.language == "wasm" {
641 Some(&self.name)
645 } else {
646 self.identifier.as_deref()
648 }
649 } else {
650 self.identifier.as_deref()
652 }
653 }
654}
655
656impl expr::UserDefinedFunctionMetadata {
657 pub fn name_in_runtime(&self) -> Option<&str> {
658 if self.version() < expr::UdfExprVersion::NameInRuntime {
659 if self.language == "rust" || self.language == "wasm" {
660 let old_identifier = self
665 .identifier
666 .as_ref()
667 .expect("Rust/WASM UDF must have identifier");
668 Some(
669 old_identifier
670 .split_once("(")
671 .expect("the old identifier must contain `(`")
672 .0,
673 )
674 } else {
675 self.identifier.as_deref()
677 }
678 } else {
679 self.identifier.as_deref()
681 }
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use crate::data::{DataType, data_type};
688 use crate::plan_common::Field;
689 use crate::stream_plan::stream_node::NodeBody;
690
691 #[test]
692 fn test_getter() {
693 let data_type: DataType = DataType {
694 is_nullable: true,
695 ..Default::default()
696 };
697 let field = Field {
698 data_type: Some(data_type),
699 name: "".to_owned(),
700 };
701 assert!(field.get_data_type().unwrap().is_nullable);
702 }
703
704 #[test]
705 fn test_enum_getter() {
706 let mut data_type: DataType = DataType::default();
707 data_type.type_name = data_type::TypeName::Double as i32;
708 assert_eq!(
709 data_type::TypeName::Double,
710 data_type.get_type_name().unwrap()
711 );
712 }
713
714 #[test]
715 fn test_enum_unspecified() {
716 let mut data_type: DataType = DataType::default();
717 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
718 assert!(data_type.get_type_name().is_err());
719 }
720
721 #[test]
722 fn test_primitive_getter() {
723 let data_type: DataType = DataType::default();
724 let new_data_type = DataType {
725 is_nullable: data_type.get_is_nullable(),
726 ..Default::default()
727 };
728 assert!(!new_data_type.is_nullable);
729 }
730
731 #[test]
732 fn test_size() {
733 use static_assertions::const_assert_eq;
734 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
737 }
738}