1#![allow(clippy::derive_partial_eq_without_eq)]
16#![recursion_limit = "256"]
17#![feature(iterator_try_collect)]
18#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(coroutines)]
21#![feature(proc_macro_hygiene)]
22#![feature(stmt_expr_attributes)]
23#![feature(allocator_api)]
24#![feature(map_try_insert)]
25#![feature(never_type)]
26#![feature(error_generic_member_access)]
27#![feature(exact_size_is_empty)]
28#![feature(impl_trait_in_assoc_type)]
29#![feature(btree_cursors)]
30#![feature(try_blocks)]
31
32use std::sync::Arc;
33
34use risingwave_common::config::StreamingConfig;
35
36#[macro_use]
37extern crate tracing;
38
39pub mod cache;
40pub mod common;
41pub mod error;
42pub mod executor;
43mod from_proto;
44pub mod task;
45pub mod telemetry;
46
47#[cfg(test)]
48risingwave_expr_impl::enable!();
49
50tokio::task_local! {
51 pub(crate) static CONFIG: Arc<StreamingConfig>;
52}
53
54mod config {
55 use risingwave_common::config::streaming::default;
56
57 pub(crate) fn chunk_size() -> usize {
58 let res = crate::CONFIG.try_with(|config| config.developer.chunk_size);
59 if res.is_err() && cfg!(not(test)) {
60 tracing::warn!("streaming CONFIG is not set, which is probably a bug")
61 }
62 res.unwrap_or_else(|_| default::developer::stream_chunk_size())
63 }
64}
65
66mod consistency {
67 use std::sync::LazyLock;
70
71 use risingwave_common::config::streaming::default;
72 use risingwave_common::util::env_var::env_var_is_true;
73
74 static INSANE_MODE: LazyLock<bool> =
75 LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE"));
76
77 pub(crate) fn insane() -> bool {
79 *INSANE_MODE
80 }
81
82 pub(crate) fn enable_strict_consistency() -> bool {
84 let res = crate::CONFIG.try_with(|config| !config.unsafe_disable_strict_consistency);
85 if res.is_err() && cfg!(not(test)) {
86 tracing::warn!("streaming CONFIG is not set, which is probably a bug");
87 }
88 res.unwrap_or_else(|_| !default::streaming::unsafe_disable_strict_consistency())
89 }
90
91 macro_rules! consistency_error {
94 ($($arg:tt)*) => {
95 debug_assert!(!crate::consistency::enable_strict_consistency());
96
97 use std::sync::LazyLock;
98 use risingwave_common::log::LogSuppressor;
99
100 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
101 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
102 tracing::error!(suppressed_count, $($arg)*);
103 }
104 };
105 }
106 pub(crate) use consistency_error;
107
108 macro_rules! consistency_panic {
111 ($($arg:tt)*) => {
112 if crate::consistency::enable_strict_consistency() {
113 tracing::error!($($arg)*);
114 panic!("inconsistency happened, see error log for details");
115 } else {
116 crate::consistency::consistency_error!($($arg)*);
117 }
118 };
119 }
120 pub(crate) use consistency_panic;
121}