risingwave_stream/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(iterator_try_collect)]
17#![feature(trait_alias)]
18#![feature(type_alias_impl_trait)]
19#![feature(more_qualified_paths)]
20#![feature(let_chains)]
21#![feature(coroutines)]
22#![feature(iter_from_coroutine)]
23#![feature(proc_macro_hygiene)]
24#![feature(stmt_expr_attributes)]
25#![feature(allocator_api)]
26#![feature(map_try_insert)]
27#![feature(never_type)]
28#![feature(btreemap_alloc)]
29#![feature(error_generic_member_access)]
30#![feature(btree_extract_if)]
31#![feature(iter_order_by)]
32#![feature(exact_size_is_empty)]
33#![feature(impl_trait_in_assoc_type)]
34#![feature(test)]
35#![feature(btree_cursors)]
36#![feature(assert_matches)]
37#![feature(try_blocks)]
38
39use std::sync::Arc;
40
41use risingwave_common::config::StreamingConfig;
42
43#[macro_use]
44extern crate tracing;
45
46pub mod cache;
47pub mod common;
48pub mod error;
49pub mod executor;
50mod from_proto;
51pub mod task;
52pub mod telemetry;
53
54#[cfg(test)]
55risingwave_expr_impl::enable!();
56
57tokio::task_local! {
58    pub(crate) static CONFIG: Arc<StreamingConfig>;
59}
60
61mod config {
62    use risingwave_common::config::default;
63
64    pub(crate) fn chunk_size() -> usize {
65        let res = crate::CONFIG.try_with(|config| config.developer.chunk_size);
66        if res.is_err() && cfg!(not(test)) {
67            tracing::warn!("streaming CONFIG is not set, which is probably a bug")
68        }
69        res.unwrap_or_else(|_| default::developer::stream_chunk_size())
70    }
71}
72
73mod consistency {
74    //! This module contains global variables and methods to access the stream consistency settings.
75
76    use std::sync::LazyLock;
77
78    use risingwave_common::config::default;
79    use risingwave_common::util::env_var::env_var_is_true;
80
81    static INSANE_MODE: LazyLock<bool> =
82        LazyLock::new(|| env_var_is_true("RW_UNSAFE_ENABLE_INSANE_MODE"));
83
84    /// Check if the insane mode is enabled.
85    pub(crate) fn insane() -> bool {
86        *INSANE_MODE
87    }
88
89    /// Check if strict consistency is required.
90    pub(crate) fn enable_strict_consistency() -> bool {
91        let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
92        if res.is_err() && cfg!(not(test)) {
93            tracing::warn!("streaming CONFIG is not set, which is probably a bug");
94        }
95        res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
96    }
97
98    /// Log an error message for breaking consistency. Must only be called in non-strict mode.
99    /// The log message will be suppressed if it is called too frequently.
100    macro_rules! consistency_error {
101        ($($arg:tt)*) => {
102            debug_assert!(!crate::consistency::enable_strict_consistency());
103
104            use std::sync::LazyLock;
105            use risingwave_common::log::LogSuppresser;
106
107            static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
108            if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
109                tracing::error!(suppressed_count, $($arg)*);
110            }
111        };
112    }
113    pub(crate) use consistency_error;
114
115    /// Log an error message for breaking consistency, then panic if strict consistency is required.
116    /// The log message will be suppressed if it is called too frequently.
117    macro_rules! consistency_panic {
118        ($($arg:tt)*) => {
119            if crate::consistency::enable_strict_consistency() {
120                tracing::error!($($arg)*);
121                panic!("inconsistency happened, see error log for details");
122            } else {
123                crate::consistency::consistency_error!($($arg)*);
124            }
125        };
126    }
127    pub(crate) use consistency_panic;
128}