1#![allow(clippy::derive_partial_eq_without_eq)]
16#![recursion_limit = "256"]
17#![feature(iterator_try_collect)]
18#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(more_qualified_paths)]
21#![feature(coroutines)]
22#![feature(iter_from_coroutine)]
23#![feature(proc_macro_hygiene)]
24#![feature(stmt_expr_attributes)]
25#![feature(allocator_api)]
26#![feature(map_try_insert)]
27#![feature(never_type)]
28#![feature(btreemap_alloc)]
29#![feature(error_generic_member_access)]
30#![feature(iter_order_by)]
31#![feature(exact_size_is_empty)]
32#![feature(impl_trait_in_assoc_type)]
33#![feature(test)]
34#![feature(btree_cursors)]
35#![feature(assert_matches)]
36#![feature(try_blocks)]
37
38use std::sync::Arc;
39
40use risingwave_common::config::StreamingConfig;
41
42#[macro_use]
43extern crate tracing;
44
45pub mod cache;
46pub mod common;
47pub mod error;
48pub mod executor;
49mod from_proto;
50pub mod task;
51pub mod telemetry;
52
53#[cfg(test)]
54risingwave_expr_impl::enable!();
55
56tokio::task_local! {
57 pub(crate) static CONFIG: Arc<StreamingConfig>;
58}
59
60mod config {
61 use risingwave_common::config::streaming::default;
62
63 pub(crate) fn chunk_size() -> usize {
64 let res = crate::CONFIG.try_with(|config| config.developer.chunk_size);
65 if res.is_err() && cfg!(not(test)) {
66 tracing::warn!("streaming CONFIG is not set, which is probably a bug")
67 }
68 res.unwrap_or_else(|_| default::developer::stream_chunk_size())
69 }
70}
71
72mod consistency {
73 use std::sync::LazyLock;
76
77 use risingwave_common::config::streaming::default;
78 use risingwave_common::util::env_var::env_var_is_true;
79
80 static INSANE_MODE: LazyLock<bool> =
81 LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE"));
82
83 pub(crate) fn insane() -> bool {
85 *INSANE_MODE
86 }
87
88 pub(crate) fn enable_strict_consistency() -> bool {
90 let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
91 if res.is_err() && cfg!(not(test)) {
92 tracing::warn!("streaming CONFIG is not set, which is probably a bug");
93 }
94 res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
95 }
96
97 macro_rules! consistency_error {
100 ($($arg:tt)*) => {
101 debug_assert!(!crate::consistency::enable_strict_consistency());
102
103 use std::sync::LazyLock;
104 use risingwave_common::log::LogSuppressor;
105
106 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
107 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
108 tracing::error!(suppressed_count, $($arg)*);
109 }
110 };
111 }
112 pub(crate) use consistency_error;
113
114 macro_rules! consistency_panic {
117 ($($arg:tt)*) => {
118 if crate::consistency::enable_strict_consistency() {
119 tracing::error!($($arg)*);
120 panic!("inconsistency happened, see error log for details");
121 } else {
122 crate::consistency::consistency_error!($($arg)*);
123 }
124 };
125 }
126 pub(crate) use consistency_panic;
127}