1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(iterator_try_collect)]
17#![feature(trait_alias)]
18#![feature(type_alias_impl_trait)]
19#![feature(more_qualified_paths)]
20#![feature(coroutines)]
21#![feature(iter_from_coroutine)]
22#![feature(proc_macro_hygiene)]
23#![feature(stmt_expr_attributes)]
24#![feature(allocator_api)]
25#![feature(map_try_insert)]
26#![feature(never_type)]
27#![feature(btreemap_alloc)]
28#![feature(error_generic_member_access)]
29#![feature(iter_order_by)]
30#![feature(exact_size_is_empty)]
31#![feature(impl_trait_in_assoc_type)]
32#![feature(test)]
33#![feature(btree_cursors)]
34#![feature(assert_matches)]
35#![feature(try_blocks)]
36
37use std::sync::Arc;
38
39use risingwave_common::config::StreamingConfig;
40
41#[macro_use]
42extern crate tracing;
43
44pub mod cache;
45pub mod common;
46pub mod error;
47pub mod executor;
48mod from_proto;
49pub mod task;
50pub mod telemetry;
51
52#[cfg(test)]
53risingwave_expr_impl::enable!();
54
55tokio::task_local! {
56 pub(crate) static CONFIG: Arc<StreamingConfig>;
57}
58
59mod config {
60 use risingwave_common::config::streaming::default;
61
62 pub(crate) fn chunk_size() -> usize {
63 let res = crate::CONFIG.try_with(|config| config.developer.chunk_size);
64 if res.is_err() && cfg!(not(test)) {
65 tracing::warn!("streaming CONFIG is not set, which is probably a bug")
66 }
67 res.unwrap_or_else(|_| default::developer::stream_chunk_size())
68 }
69}
70
71mod consistency {
72 use std::sync::LazyLock;
75
76 use risingwave_common::config::streaming::default;
77 use risingwave_common::util::env_var::env_var_is_true;
78
79 static INSANE_MODE: LazyLock<bool> =
80 LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE"));
81
82 pub(crate) fn insane() -> bool {
84 *INSANE_MODE
85 }
86
87 pub(crate) fn enable_strict_consistency() -> bool {
89 let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
90 if res.is_err() && cfg!(not(test)) {
91 tracing::warn!("streaming CONFIG is not set, which is probably a bug");
92 }
93 res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
94 }
95
96 macro_rules! consistency_error {
99 ($($arg:tt)*) => {
100 debug_assert!(!crate::consistency::enable_strict_consistency());
101
102 use std::sync::LazyLock;
103 use risingwave_common::log::LogSuppresser;
104
105 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
106 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
107 tracing::error!(suppressed_count, $($arg)*);
108 }
109 };
110 }
111 pub(crate) use consistency_error;
112
113 macro_rules! consistency_panic {
116 ($($arg:tt)*) => {
117 if crate::consistency::enable_strict_consistency() {
118 tracing::error!($($arg)*);
119 panic!("inconsistency happened, see error log for details");
120 } else {
121 crate::consistency::consistency_error!($($arg)*);
122 }
123 };
124 }
125 pub(crate) use consistency_panic;
126}