1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(iterator_try_collect)]
17#![feature(trait_alias)]
18#![feature(type_alias_impl_trait)]
19#![feature(more_qualified_paths)]
20#![feature(let_chains)]
21#![feature(coroutines)]
22#![feature(iter_from_coroutine)]
23#![feature(proc_macro_hygiene)]
24#![feature(stmt_expr_attributes)]
25#![feature(allocator_api)]
26#![feature(map_try_insert)]
27#![feature(never_type)]
28#![feature(btreemap_alloc)]
29#![feature(error_generic_member_access)]
30#![feature(btree_extract_if)]
31#![feature(iter_order_by)]
32#![feature(exact_size_is_empty)]
33#![feature(impl_trait_in_assoc_type)]
34#![feature(test)]
35#![feature(btree_cursors)]
36#![feature(assert_matches)]
37#![feature(try_blocks)]
38#![feature(result_flattening)] use std::sync::Arc;
41
42use risingwave_common::config::StreamingConfig;
43
44#[macro_use]
45extern crate tracing;
46
47pub mod cache;
48pub mod common;
49pub mod error;
50pub mod executor;
51mod from_proto;
52pub mod task;
53pub mod telemetry;
54
55#[cfg(test)]
56risingwave_expr_impl::enable!();
57
58tokio::task_local! {
59 pub(crate) static CONFIG: Arc<StreamingConfig>;
60}
61
62mod config {
63 use risingwave_common::config::default;
64
65 pub(crate) fn chunk_size() -> usize {
66 let res = crate::CONFIG.try_with(|config| config.developer.chunk_size);
67 if res.is_err() && cfg!(not(test)) {
68 tracing::warn!("streaming CONFIG is not set, which is probably a bug")
69 }
70 res.unwrap_or_else(|_| default::developer::stream_chunk_size())
71 }
72}
73
74mod consistency {
75 use std::sync::LazyLock;
78
79 use risingwave_common::config::default;
80 use risingwave_common::util::env_var::env_var_is_true;
81
82 static INSANE_MODE: LazyLock<bool> =
83 LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE"));
84
85 pub(crate) fn insane() -> bool {
87 *INSANE_MODE
88 }
89
90 pub(crate) fn enable_strict_consistency() -> bool {
92 let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
93 if res.is_err() && cfg!(not(test)) {
94 tracing::warn!("streaming CONFIG is not set, which is probably a bug");
95 }
96 res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
97 }
98
99 macro_rules! consistency_error {
102 ($($arg:tt)*) => {
103 debug_assert!(!crate::consistency::enable_strict_consistency());
104
105 use std::sync::LazyLock;
106 use risingwave_common::log::LogSuppresser;
107
108 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
109 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
110 tracing::error!(suppressed_count, $($arg)*);
111 }
112 };
113 }
114 pub(crate) use consistency_error;
115
116 macro_rules! consistency_panic {
119 ($($arg:tt)*) => {
120 if crate::consistency::enable_strict_consistency() {
121 tracing::error!($($arg)*);
122 panic!("inconsistency happened, see error log for details");
123 } else {
124 crate::consistency::consistency_error!($($arg)*);
125 }
126 };
127 }
128 pub(crate) use consistency_panic;
129}