risingwave_stream/executor/
actor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::TableId;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::log::LogSuppresser;
29use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_expr::ExprError;
32use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
33use risingwave_pb::plan_common::ExprContext;
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
36use risingwave_rpc_client::MetaClient;
37use thiserror_ext::AsReport;
38use tokio_stream::StreamExt;
39use tracing::Instrument;
40
41use super::StreamConsumer;
42use super::monitor::StreamingMetrics;
43use super::subtask::SubtaskHandle;
44use crate::error::StreamResult;
45use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
46
47/// Shared by all operators of an actor.
48pub struct ActorContext {
49    pub id: ActorId,
50    pub fragment_id: u32,
51    pub vnode_count: usize,
52    pub mview_definition: String,
53
54    // TODO(eric): these seem to be useless now?
55    last_mem_val: Arc<AtomicUsize>,
56    cur_mem_val: Arc<AtomicUsize>,
57    total_mem_val: Arc<TrAdder<i64>>,
58
59    pub streaming_metrics: Arc<StreamingMetrics>,
60
61    /// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added.
62    pub initial_dispatch_num: usize,
63    // mv_table_id to subscription id
64    pub related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
65    pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
66
67    // Meta client. currently used for auto schema change. `None` for test only
68    pub meta_client: Option<MetaClient>,
69
70    pub streaming_config: Arc<StreamingConfig>,
71
72    pub stream_env: StreamEnvironment,
73}
74
75pub type ActorContextRef = Arc<ActorContext>;
76
77impl ActorContext {
78    pub fn for_test(id: ActorId) -> ActorContextRef {
79        Arc::new(Self {
80            id,
81            fragment_id: 0,
82            vnode_count: VirtualNode::COUNT_FOR_TEST,
83            mview_definition: "".to_owned(),
84            cur_mem_val: Arc::new(0.into()),
85            last_mem_val: Arc::new(0.into()),
86            total_mem_val: Arc::new(TrAdder::new()),
87            streaming_metrics: Arc::new(StreamingMetrics::unused()),
88            // Set 1 for test to enable sanity check on table
89            initial_dispatch_num: 1,
90            related_subscriptions: HashMap::new().into(),
91            initial_upstream_actors: Default::default(),
92            meta_client: None,
93            streaming_config: Arc::new(StreamingConfig::default()),
94            stream_env: StreamEnvironment::for_test(),
95        })
96    }
97
98    #[allow(clippy::too_many_arguments)]
99    pub fn create(
100        stream_actor: &BuildActorInfo,
101        fragment_id: FragmentId,
102        total_mem_val: Arc<TrAdder<i64>>,
103        streaming_metrics: Arc<StreamingMetrics>,
104        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
105        meta_client: Option<MetaClient>,
106        streaming_config: Arc<StreamingConfig>,
107        stream_env: StreamEnvironment,
108    ) -> ActorContextRef {
109        Arc::new(Self {
110            id: stream_actor.actor_id,
111            fragment_id,
112            mview_definition: stream_actor.mview_definition.clone(),
113            vnode_count: (stream_actor.vnode_bitmap.as_ref())
114                // An unset `vnode_bitmap` means the actor is a singleton,
115                // where only `SINGLETON_VNODE` is set.
116                .map_or(1, |b| Bitmap::from(b).len()),
117            cur_mem_val: Arc::new(0.into()),
118            last_mem_val: Arc::new(0.into()),
119            total_mem_val,
120            streaming_metrics,
121            initial_dispatch_num: stream_actor.dispatchers.len(),
122            related_subscriptions,
123            initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
124            meta_client,
125            streaming_config,
126            stream_env,
127        })
128    }
129
130    pub fn on_compute_error(&self, err: ExprError, identity: &str) {
131        static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
132        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
133            tracing::error!(identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
134        }
135
136        let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
137        GLOBAL_ERROR_METRICS.user_compute_error.report([
138            "ExprError".to_owned(),
139            executor_name.to_owned(),
140            self.fragment_id.to_string(),
141        ]);
142    }
143
144    pub fn store_mem_usage(&self, val: usize) {
145        // Record the last mem val.
146        // Calculate the difference between old val and new value, and apply the diff to total
147        // memory usage value.
148        let old_value = self.cur_mem_val.load(Ordering::Relaxed);
149        self.last_mem_val.store(old_value, Ordering::Relaxed);
150        let diff = val as i64 - old_value as i64;
151
152        self.total_mem_val.inc(diff);
153
154        self.cur_mem_val.store(val, Ordering::Relaxed);
155    }
156
157    pub fn mem_usage(&self) -> usize {
158        self.cur_mem_val.load(Ordering::Relaxed)
159    }
160}
161
162/// `Actor` is the basic execution unit in the streaming framework.
163pub struct Actor<C> {
164    /// The [`StreamConsumer`] of the actor.
165    consumer: C,
166    /// The subtasks to execute concurrently.
167    subtasks: Vec<SubtaskHandle>,
168
169    pub actor_context: ActorContextRef,
170    expr_context: ExprContext,
171    barrier_manager: LocalBarrierManager,
172}
173
174impl<C> Actor<C>
175where
176    C: StreamConsumer,
177{
178    pub fn new(
179        consumer: C,
180        subtasks: Vec<SubtaskHandle>,
181        _metrics: Arc<StreamingMetrics>,
182        actor_context: ActorContextRef,
183        expr_context: ExprContext,
184        barrier_manager: LocalBarrierManager,
185    ) -> Self {
186        Self {
187            consumer,
188            subtasks,
189            actor_context,
190            expr_context,
191            barrier_manager,
192        }
193    }
194
195    #[inline(always)]
196    pub async fn run(mut self) -> StreamResult<()> {
197        let expr_context = self.expr_context.clone();
198        let fragment_id = self.actor_context.fragment_id;
199        let vnode_count = self.actor_context.vnode_count;
200
201        let run = async move {
202            tokio::join!(
203                // Drive the subtasks concurrently.
204                join_all(std::mem::take(&mut self.subtasks)),
205                self.run_consumer(),
206            )
207            .1
208        }
209        .boxed();
210
211        // Attach contexts to the future.
212        let run = expr_context_scope(expr_context, run);
213        let run = FRAGMENT_ID::scope(fragment_id, run);
214        let run = VNODE_COUNT::scope(vnode_count, run);
215
216        run.await
217    }
218
219    async fn run_consumer(self) -> StreamResult<()> {
220        fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
221            "intentional start_actors_err"
222        )
223        .into()));
224
225        let id = self.actor_context.id;
226        let span_name = format!("Actor {id}");
227
228        let new_span = |epoch: Option<EpochPair>| {
229            tracing::info_span!(
230                parent: None,
231                "actor",
232                "otel.name" = span_name,
233                actor_id = id,
234                prev_epoch = epoch.map(|e| e.prev),
235                curr_epoch = epoch.map(|e| e.curr),
236            )
237        };
238        let mut span = new_span(None);
239
240        let actor_count = self
241            .actor_context
242            .streaming_metrics
243            .actor_count
244            .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
245        let _actor_count_guard = actor_count.inc_guard();
246
247        let current_epoch = self
248            .actor_context
249            .streaming_metrics
250            .actor_current_epoch
251            .with_guarded_label_values(&[
252                &self.actor_context.id.to_string(),
253                &self.actor_context.fragment_id.to_string(),
254            ]);
255
256        let mut last_epoch: Option<EpochPair> = None;
257        let mut stream = Box::pin(Box::new(self.consumer).execute());
258
259        // Drive the streaming task with an infinite loop
260        let result = loop {
261            let barrier = match stream
262                .try_next()
263                .instrument(span.clone())
264                .instrument_await(
265                    last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
266                        await_tree::span!("Epoch {}", e.curr)
267                    }),
268                )
269                .await
270            {
271                Ok(Some(barrier)) => barrier,
272                Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
273                Err(err) => break Err(err),
274            };
275
276            fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
277                "intentional collect_actors_err"
278            )
279            .into()));
280
281            // Then stop this actor if asked
282            if barrier.is_stop(id) {
283                debug!(actor_id = id, epoch = ?barrier.epoch, "stop at barrier");
284                break Ok(barrier);
285            }
286
287            current_epoch.set(barrier.epoch.curr as i64);
288
289            // Collect barriers to local barrier manager
290            self.barrier_manager.collect(id, &barrier);
291
292            // Tracing related work
293            last_epoch = Some(barrier.epoch);
294            span = barrier.tracing_context().attach(new_span(last_epoch));
295        };
296
297        spawn_blocking_drop_stream(stream).await;
298
299        let result = result.map(|stop_barrier| {
300            // Collect the stop barrier after the stream has been dropped to ensure that all resources
301            self.barrier_manager.collect(id, &stop_barrier);
302        });
303
304        tracing::debug!(actor_id = id, ok = result.is_ok(), "actor exit");
305        result
306    }
307}
308
309/// Drop the stream in a blocking task to avoid interfering with other actors.
310///
311/// Logically the actor is dropped after we send the barrier with `Drop` mutation to the
312/// downstream, thus making the `drop`'s progress asynchronous. However, there might be a
313/// considerable amount of data in the executors' in-memory cache, dropping these structures might
314/// be a CPU-intensive task. This may lead to the runtime being unable to schedule other actors if
315/// the `drop` is called on the current thread.
316pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
317    let _ = tokio::task::spawn_blocking(move || drop(stream))
318        .instrument_await("drop_stream")
319        .await;
320}