1#![allow(unfulfilled_lint_expectations)]
16#![allow(clippy::doc_overindented_list_items)]
17#![expect(clippy::doc_markdown)]
19#![expect(clippy::upper_case_acronyms)]
20#![expect(clippy::needless_lifetimes)]
21#![expect(clippy::disallowed_methods)]
23#![expect(clippy::enum_variant_names)]
24#![expect(clippy::module_inception)]
25#![expect(clippy::large_enum_variant)]
27
28use std::str::FromStr;
29
30use event_recovery::RecoveryEvent;
31use plan_common::AdditionalColumn;
32pub use prost::Message;
33use risingwave_error::tonic::ToTonicStatus;
34use thiserror::Error;
35
36use crate::common::WorkerType;
37use crate::meta::event_log::event_recovery;
38use crate::stream_plan::PbStreamScanType;
39
40#[rustfmt::skip]
41#[cfg_attr(madsim, path = "sim/catalog.rs")]
42pub mod catalog;
43#[rustfmt::skip]
44#[cfg_attr(madsim, path = "sim/common.rs")]
45pub mod common;
46#[rustfmt::skip]
47#[cfg_attr(madsim, path = "sim/compute.rs")]
48pub mod compute;
49#[rustfmt::skip]
50#[cfg_attr(madsim, path = "sim/cloud_service.rs")]
51pub mod cloud_service;
52#[rustfmt::skip]
53#[cfg_attr(madsim, path = "sim/data.rs")]
54pub mod data;
55#[rustfmt::skip]
56#[cfg_attr(madsim, path = "sim/ddl_service.rs")]
57pub mod ddl_service;
58#[rustfmt::skip]
59#[cfg_attr(madsim, path = "sim/expr.rs")]
60pub mod expr;
61#[rustfmt::skip]
62#[cfg_attr(madsim, path = "sim/meta.rs")]
63pub mod meta;
64#[rustfmt::skip]
65#[cfg_attr(madsim, path = "sim/plan_common.rs")]
66pub mod plan_common;
67#[rustfmt::skip]
68#[cfg_attr(madsim, path = "sim/batch_plan.rs")]
69pub mod batch_plan;
70#[rustfmt::skip]
71#[cfg_attr(madsim, path = "sim/task_service.rs")]
72pub mod task_service;
73#[rustfmt::skip]
74#[cfg_attr(madsim, path = "sim/connector_service.rs")]
75pub mod connector_service;
76#[rustfmt::skip]
77#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
78pub mod stream_plan;
79#[rustfmt::skip]
80#[cfg_attr(madsim, path = "sim/stream_service.rs")]
81pub mod stream_service;
82#[rustfmt::skip]
83#[cfg_attr(madsim, path = "sim/hummock.rs")]
84pub mod hummock;
85#[rustfmt::skip]
86#[cfg_attr(madsim, path = "sim/compactor.rs")]
87pub mod compactor;
88#[rustfmt::skip]
89#[cfg_attr(madsim, path = "sim/user.rs")]
90pub mod user;
91#[rustfmt::skip]
92#[cfg_attr(madsim, path = "sim/source.rs")]
93pub mod source;
94#[rustfmt::skip]
95#[cfg_attr(madsim, path = "sim/monitor_service.rs")]
96pub mod monitor_service;
97#[rustfmt::skip]
98#[cfg_attr(madsim, path = "sim/backup_service.rs")]
99pub mod backup_service;
100#[rustfmt::skip]
101#[cfg_attr(madsim, path = "sim/serverless_backfill_controller.rs")]
102pub mod serverless_backfill_controller;
103#[rustfmt::skip]
104#[cfg_attr(madsim, path = "sim/frontend_service.rs")]
105pub mod frontend_service;
106#[rustfmt::skip]
107#[cfg_attr(madsim, path = "sim/java_binding.rs")]
108pub mod java_binding;
109#[rustfmt::skip]
110#[cfg_attr(madsim, path = "sim/health.rs")]
111pub mod health;
112#[rustfmt::skip]
113#[path = "sim/telemetry.rs"]
114pub mod telemetry;
115
116#[rustfmt::skip]
117#[path = "sim/secret.rs"]
118pub mod secret;
119#[rustfmt::skip]
120#[path = "connector_service.serde.rs"]
121pub mod connector_service_serde;
122#[rustfmt::skip]
123#[path = "catalog.serde.rs"]
124pub mod catalog_serde;
125#[rustfmt::skip]
126#[path = "common.serde.rs"]
127pub mod common_serde;
128#[rustfmt::skip]
129#[path = "compute.serde.rs"]
130pub mod compute_serde;
131#[rustfmt::skip]
132#[path = "cloud_service.serde.rs"]
133pub mod cloud_service_serde;
134#[rustfmt::skip]
135#[path = "data.serde.rs"]
136pub mod data_serde;
137#[rustfmt::skip]
138#[path = "ddl_service.serde.rs"]
139pub mod ddl_service_serde;
140#[rustfmt::skip]
141#[path = "expr.serde.rs"]
142pub mod expr_serde;
143#[rustfmt::skip]
144#[path = "meta.serde.rs"]
145pub mod meta_serde;
146#[rustfmt::skip]
147#[path = "plan_common.serde.rs"]
148pub mod plan_common_serde;
149#[rustfmt::skip]
150#[path = "batch_plan.serde.rs"]
151pub mod batch_plan_serde;
152#[rustfmt::skip]
153#[path = "task_service.serde.rs"]
154pub mod task_service_serde;
155#[rustfmt::skip]
156#[path = "stream_plan.serde.rs"]
157pub mod stream_plan_serde;
158#[rustfmt::skip]
159#[path = "stream_service.serde.rs"]
160pub mod stream_service_serde;
161#[rustfmt::skip]
162#[path = "hummock.serde.rs"]
163pub mod hummock_serde;
164#[rustfmt::skip]
165#[path = "compactor.serde.rs"]
166pub mod compactor_serde;
167#[rustfmt::skip]
168#[path = "user.serde.rs"]
169pub mod user_serde;
170#[rustfmt::skip]
171#[path = "source.serde.rs"]
172pub mod source_serde;
173#[rustfmt::skip]
174#[path = "monitor_service.serde.rs"]
175pub mod monitor_service_serde;
176#[rustfmt::skip]
177#[path = "backup_service.serde.rs"]
178pub mod backup_service_serde;
179#[rustfmt::skip]
180#[path = "java_binding.serde.rs"]
181pub mod java_binding_serde;
182#[rustfmt::skip]
183#[path = "telemetry.serde.rs"]
184pub mod telemetry_serde;
185
186#[rustfmt::skip]
187#[path = "secret.serde.rs"]
188pub mod secret_serde;
189#[rustfmt::skip]
190#[path = "serverless_backfill_controller.serde.rs"]
191pub mod serverless_backfill_controller_serde;
192
193#[derive(Clone, PartialEq, Eq, Debug, Error)]
194#[error("field `{0}` not found")]
195pub struct PbFieldNotFound(pub &'static str);
196
197impl From<PbFieldNotFound> for tonic::Status {
198 fn from(e: PbFieldNotFound) -> Self {
199 e.to_status_unnamed(tonic::Code::Internal)
200 }
201}
202
203impl FromStr for crate::expr::table_function::PbType {
204 type Err = ();
205
206 fn from_str(s: &str) -> Result<Self, Self::Err> {
207 Self::from_str_name(&s.to_uppercase()).ok_or(())
208 }
209}
210
211impl FromStr for crate::expr::agg_call::PbKind {
212 type Err = ();
213
214 fn from_str(s: &str) -> Result<Self, Self::Err> {
215 Self::from_str_name(&s.to_uppercase()).ok_or(())
216 }
217}
218
219impl stream_plan::MaterializeNode {
220 pub fn dist_key_indices(&self) -> Vec<u32> {
221 self.get_table()
222 .unwrap()
223 .distribution_key
224 .iter()
225 .map(|i| *i as u32)
226 .collect()
227 }
228
229 pub fn column_ids(&self) -> Vec<i32> {
230 self.get_table()
231 .unwrap()
232 .columns
233 .iter()
234 .map(|c| c.get_column_desc().unwrap().column_id)
235 .collect()
236 }
237}
238
239impl common::WorkerNode {
241 pub fn compute_node_parallelism(&self) -> usize {
242 assert_eq!(self.r#type(), WorkerType::ComputeNode);
243 self.property
244 .as_ref()
245 .expect("property should be exist")
246 .parallelism as usize
247 }
248
249 pub fn parallelism(&self) -> Option<usize> {
250 if WorkerType::ComputeNode == self.r#type() {
251 Some(self.compute_node_parallelism())
252 } else {
253 None
254 }
255 }
256
257 pub fn resource_group(&self) -> Option<String> {
258 self.property
259 .as_ref()
260 .and_then(|p| p.resource_group.clone())
261 }
262}
263
264impl stream_plan::SourceNode {
265 pub fn column_ids(&self) -> Option<Vec<i32>> {
266 Some(
267 self.source_inner
268 .as_ref()?
269 .columns
270 .iter()
271 .map(|c| c.get_column_desc().unwrap().column_id)
272 .collect(),
273 )
274 }
275}
276
277impl meta::table_fragments::ActorStatus {
278 pub fn worker_id(&self) -> u32 {
279 self.location
280 .as_ref()
281 .expect("actor location should be exist")
282 .worker_node_id
283 }
284}
285
286impl common::WorkerNode {
287 pub fn is_streaming_schedulable(&self) -> bool {
288 let property = self.property.as_ref();
289 property.is_some_and(|p| p.is_streaming) && !property.is_some_and(|p| p.is_unschedulable)
290 }
291}
292
293impl common::ActorLocation {
294 pub fn from_worker(worker_node_id: u32) -> Option<Self> {
295 Some(Self { worker_node_id })
296 }
297}
298
299impl meta::event_log::EventRecovery {
300 pub fn event_type(&self) -> &str {
301 match self.recovery_event.as_ref() {
302 Some(RecoveryEvent::DatabaseStart(_)) => "DATABASE_RECOVERY_START",
303 Some(RecoveryEvent::DatabaseSuccess(_)) => "DATABASE_RECOVERY_SUCCESS",
304 Some(RecoveryEvent::DatabaseFailure(_)) => "DATABASE_RECOVERY_FAILURE",
305 Some(RecoveryEvent::GlobalStart(_)) => "GLOBAL_RECOVERY_START",
306 Some(RecoveryEvent::GlobalSuccess(_)) => "GLOBAL_RECOVERY_SUCCESS",
307 Some(RecoveryEvent::GlobalFailure(_)) => "GLOBAL_RECOVERY_FAILURE",
308 None => "UNKNOWN_RECOVERY_EVENT",
309 }
310 }
311
312 pub fn database_recovery_start(database_id: u32) -> Self {
313 Self {
314 recovery_event: Some(RecoveryEvent::DatabaseStart(
315 event_recovery::DatabaseRecoveryStart { database_id },
316 )),
317 }
318 }
319
320 pub fn database_recovery_failure(database_id: u32) -> Self {
321 Self {
322 recovery_event: Some(RecoveryEvent::DatabaseFailure(
323 event_recovery::DatabaseRecoveryFailure { database_id },
324 )),
325 }
326 }
327
328 pub fn database_recovery_success(database_id: u32) -> Self {
329 Self {
330 recovery_event: Some(RecoveryEvent::DatabaseSuccess(
331 event_recovery::DatabaseRecoverySuccess { database_id },
332 )),
333 }
334 }
335
336 pub fn global_recovery_start(reason: String) -> Self {
337 Self {
338 recovery_event: Some(RecoveryEvent::GlobalStart(
339 event_recovery::GlobalRecoveryStart { reason },
340 )),
341 }
342 }
343
344 pub fn global_recovery_success(
345 reason: String,
346 duration_secs: f32,
347 running_database_ids: Vec<u32>,
348 recovering_database_ids: Vec<u32>,
349 ) -> Self {
350 Self {
351 recovery_event: Some(RecoveryEvent::GlobalSuccess(
352 event_recovery::GlobalRecoverySuccess {
353 reason,
354 duration_secs,
355 running_database_ids,
356 recovering_database_ids,
357 },
358 )),
359 }
360 }
361
362 pub fn global_recovery_failure(reason: String, error: String) -> Self {
363 Self {
364 recovery_event: Some(RecoveryEvent::GlobalFailure(
365 event_recovery::GlobalRecoveryFailure { reason, error },
366 )),
367 }
368 }
369}
370
371impl stream_plan::StreamNode {
372 pub fn find_stream_source(&self) -> Option<u32> {
376 if let Some(crate::stream_plan::stream_node::NodeBody::Source(source)) =
377 self.node_body.as_ref()
378 {
379 if let Some(inner) = &source.source_inner {
380 return Some(inner.source_id);
381 }
382 }
383
384 for child in &self.input {
385 if let Some(source) = child.find_stream_source() {
386 return Some(source);
387 }
388 }
389
390 None
391 }
392
393 pub fn find_source_backfill(&self) -> Option<(u32, u32)> {
401 if let Some(crate::stream_plan::stream_node::NodeBody::SourceBackfill(source)) =
402 self.node_body.as_ref()
403 {
404 if let crate::stream_plan::stream_node::NodeBody::Merge(merge) =
405 self.input[0].node_body.as_ref().unwrap()
406 {
407 return Some((source.upstream_source_id, merge.upstream_fragment_id));
410 } else {
411 unreachable!(
412 "source backfill must have a merge node as its input: {:?}",
413 self
414 );
415 }
416 }
417
418 for child in &self.input {
419 if let Some(source) = child.find_source_backfill() {
420 return Some(source);
421 }
422 }
423
424 None
425 }
426}
427
428impl stream_plan::FragmentTypeFlag {
429 pub fn backfill_rate_limit_fragments() -> i32 {
431 stream_plan::FragmentTypeFlag::SourceScan as i32
432 | stream_plan::FragmentTypeFlag::StreamScan as i32
433 }
434
435 pub fn source_rate_limit_fragments() -> i32 {
438 stream_plan::FragmentTypeFlag::Source as i32 | stream_plan::FragmentTypeFlag::FsFetch as i32
439 }
440
441 pub fn sink_rate_limit_fragments() -> i32 {
443 stream_plan::FragmentTypeFlag::Sink as i32
444 }
445
446 pub fn rate_limit_fragments() -> i32 {
448 Self::backfill_rate_limit_fragments()
449 | Self::source_rate_limit_fragments()
450 | Self::sink_rate_limit_fragments()
451 }
452
453 pub fn dml_rate_limit_fragments() -> i32 {
454 stream_plan::FragmentTypeFlag::Dml as i32
455 }
456}
457
458impl stream_plan::Dispatcher {
459 pub fn as_strategy(&self) -> stream_plan::DispatchStrategy {
460 stream_plan::DispatchStrategy {
461 r#type: self.r#type,
462 dist_key_indices: self.dist_key_indices.clone(),
463 output_indices: self.output_indices.clone(),
464 }
465 }
466}
467
468impl catalog::StreamSourceInfo {
469 pub fn is_shared(&self) -> bool {
471 self.cdc_source_job
472 }
473}
474
475impl stream_plan::PbStreamScanType {
476 pub fn is_reschedulable(&self) -> bool {
477 match self {
478 PbStreamScanType::UpstreamOnly => false,
480 PbStreamScanType::ArrangementBackfill => true,
481 PbStreamScanType::CrossDbSnapshotBackfill => true,
482 PbStreamScanType::SnapshotBackfill => false,
484 _ => false,
485 }
486 }
487}
488
489impl catalog::Sink {
490 pub const UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK: &'static str = "PLACE_HOLDER";
493
494 pub fn unique_identity(&self) -> String {
495 format!("{}", self.id)
497 }
498}
499
500impl std::fmt::Debug for meta::SystemParams {
501 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct("SystemParams").finish_non_exhaustive()
506 }
507}
508
509impl std::fmt::Debug for data::DataType {
512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513 let data::DataType {
514 precision,
515 scale,
516 interval_type,
517 field_type,
518 field_names,
519 field_ids,
520 type_name,
521 is_nullable: _,
523 } = self;
524
525 let type_name = data::data_type::TypeName::try_from(*type_name)
526 .map(|t| t.as_str_name())
527 .unwrap_or("Unknown");
528
529 let mut s = f.debug_struct(type_name);
530 if self.precision != 0 {
531 s.field("precision", precision);
532 }
533 if self.scale != 0 {
534 s.field("scale", scale);
535 }
536 if self.interval_type != 0 {
537 s.field("interval_type", interval_type);
538 }
539 if !self.field_type.is_empty() {
540 s.field("field_type", field_type);
541 }
542 if !self.field_names.is_empty() {
543 s.field("field_names", field_names);
544 }
545 if !self.field_ids.is_empty() {
546 s.field("field_ids", field_ids);
547 }
548 s.finish()
549 }
550}
551
552impl std::fmt::Debug for plan_common::column_desc::GeneratedOrDefaultColumn {
553 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
554 match self {
555 Self::GeneratedColumn(arg0) => f.debug_tuple("GeneratedColumn").field(arg0).finish(),
556 Self::DefaultColumn(arg0) => f.debug_tuple("DefaultColumn").field(arg0).finish(),
557 }
558 }
559}
560
561impl std::fmt::Debug for plan_common::ColumnDesc {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 let plan_common::ColumnDesc {
565 column_type,
566 column_id,
567 name,
568 description,
569 additional_column_type,
570 additional_column,
571 generated_or_default_column,
572 version,
573 nullable,
574 } = self;
575
576 let mut s = f.debug_struct("ColumnDesc");
577 if let Some(column_type) = column_type {
578 s.field("column_type", column_type);
579 } else {
580 s.field("column_type", &"Unknown");
581 }
582 s.field("column_id", column_id).field("name", name);
583 if let Some(description) = description {
584 s.field("description", description);
585 }
586 if self.additional_column_type != 0 {
587 s.field("additional_column_type", additional_column_type);
588 }
589 s.field("version", version);
590 if let Some(AdditionalColumn {
591 column_type: Some(column_type),
592 }) = additional_column
593 {
594 s.field("additional_column", &column_type);
596 }
597 if let Some(generated_or_default_column) = generated_or_default_column {
598 s.field("generated_or_default_column", &generated_or_default_column);
599 }
600 s.field("nullable", nullable);
601 s.finish()
602 }
603}
604
605impl expr::UserDefinedFunction {
606 pub fn name_in_runtime(&self) -> Option<&str> {
607 if self.version() < expr::UdfExprVersion::NameInRuntime {
608 if self.language == "rust" || self.language == "wasm" {
609 Some(&self.name)
613 } else {
614 self.identifier.as_deref()
616 }
617 } else {
618 self.identifier.as_deref()
620 }
621 }
622}
623
624impl expr::UserDefinedFunctionMetadata {
625 pub fn name_in_runtime(&self) -> Option<&str> {
626 if self.version() < expr::UdfExprVersion::NameInRuntime {
627 if self.language == "rust" || self.language == "wasm" {
628 let old_identifier = self
633 .identifier
634 .as_ref()
635 .expect("Rust/WASM UDF must have identifier");
636 Some(
637 old_identifier
638 .split_once("(")
639 .expect("the old identifier must contain `(`")
640 .0,
641 )
642 } else {
643 self.identifier.as_deref()
645 }
646 } else {
647 self.identifier.as_deref()
649 }
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use crate::data::{DataType, data_type};
656 use crate::plan_common::Field;
657 use crate::stream_plan::stream_node::NodeBody;
658
659 #[test]
660 fn test_getter() {
661 let data_type: DataType = DataType {
662 is_nullable: true,
663 ..Default::default()
664 };
665 let field = Field {
666 data_type: Some(data_type),
667 name: "".to_owned(),
668 };
669 assert!(field.get_data_type().unwrap().is_nullable);
670 }
671
672 #[test]
673 fn test_enum_getter() {
674 let mut data_type: DataType = DataType::default();
675 data_type.type_name = data_type::TypeName::Double as i32;
676 assert_eq!(
677 data_type::TypeName::Double,
678 data_type.get_type_name().unwrap()
679 );
680 }
681
682 #[test]
683 fn test_enum_unspecified() {
684 let mut data_type: DataType = DataType::default();
685 data_type.type_name = data_type::TypeName::TypeUnspecified as i32;
686 assert!(data_type.get_type_name().is_err());
687 }
688
689 #[test]
690 fn test_primitive_getter() {
691 let data_type: DataType = DataType::default();
692 let new_data_type = DataType {
693 is_nullable: data_type.get_is_nullable(),
694 ..Default::default()
695 };
696 assert!(!new_data_type.is_nullable);
697 }
698
699 #[test]
700 fn test_size() {
701 use static_assertions::const_assert_eq;
702 const_assert_eq!(std::mem::size_of::<NodeBody>(), 16);
705 }
706}