risingwave_stream/executor/
actor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::log::LogSuppressor;
28use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
32use risingwave_pb::plan_common::ExprContext;
33use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
34use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
35use risingwave_rpc_client::MetaClient;
36use thiserror_ext::AsReport;
37use tokio_stream::StreamExt;
38use tracing::Instrument;
39
40use super::StreamConsumer;
41use super::monitor::StreamingMetrics;
42use super::subtask::SubtaskHandle;
43use crate::error::StreamResult;
44use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
45
46/// Shared by all operators of an actor.
47pub struct ActorContext {
48    pub id: ActorId,
49    pub fragment_id: u32,
50    pub vnode_count: usize,
51    pub mview_definition: String,
52
53    // TODO(eric): these seem to be useless now?
54    last_mem_val: Arc<AtomicUsize>,
55    cur_mem_val: Arc<AtomicUsize>,
56    total_mem_val: Arc<TrAdder<i64>>,
57
58    pub streaming_metrics: Arc<StreamingMetrics>,
59
60    /// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added.
61    pub initial_dispatch_num: usize,
62    // mv_table_id to subscription id
63    pub initial_subscriber_ids: HashSet<u32>,
64    pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
65
66    // Meta client. currently used for auto schema change. `None` for test only
67    pub meta_client: Option<MetaClient>,
68
69    pub streaming_config: Arc<StreamingConfig>,
70
71    pub stream_env: StreamEnvironment,
72}
73
74pub type ActorContextRef = Arc<ActorContext>;
75
76impl ActorContext {
77    pub fn for_test(id: ActorId) -> ActorContextRef {
78        Arc::new(Self {
79            id,
80            fragment_id: 0,
81            vnode_count: VirtualNode::COUNT_FOR_TEST,
82            mview_definition: "".to_owned(),
83            cur_mem_val: Arc::new(0.into()),
84            last_mem_val: Arc::new(0.into()),
85            total_mem_val: Arc::new(TrAdder::new()),
86            streaming_metrics: Arc::new(StreamingMetrics::unused()),
87            // Set 1 for test to enable sanity check on table
88            initial_dispatch_num: 1,
89            initial_subscriber_ids: Default::default(),
90            initial_upstream_actors: Default::default(),
91            meta_client: None,
92            streaming_config: Arc::new(StreamingConfig::default()),
93            stream_env: StreamEnvironment::for_test(),
94        })
95    }
96
97    #[allow(clippy::too_many_arguments)]
98    pub fn create(
99        stream_actor: &BuildActorInfo,
100        fragment_id: FragmentId,
101        total_mem_val: Arc<TrAdder<i64>>,
102        streaming_metrics: Arc<StreamingMetrics>,
103        meta_client: Option<MetaClient>,
104        streaming_config: Arc<StreamingConfig>,
105        stream_env: StreamEnvironment,
106    ) -> ActorContextRef {
107        Arc::new(Self {
108            id: stream_actor.actor_id,
109            fragment_id,
110            mview_definition: stream_actor.mview_definition.clone(),
111            vnode_count: (stream_actor.vnode_bitmap.as_ref())
112                // An unset `vnode_bitmap` means the actor is a singleton,
113                // where only `SINGLETON_VNODE` is set.
114                .map_or(1, |b| Bitmap::from(b).len()),
115            cur_mem_val: Arc::new(0.into()),
116            last_mem_val: Arc::new(0.into()),
117            total_mem_val,
118            streaming_metrics,
119            initial_dispatch_num: stream_actor.dispatchers.len(),
120            initial_subscriber_ids: stream_actor
121                .initial_subscriber_ids
122                .iter()
123                .copied()
124                .collect(),
125            initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
126            meta_client,
127            streaming_config,
128            stream_env,
129        })
130    }
131
132    pub fn on_compute_error(&self, err: ExprError, identity: &str) {
133        static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
134        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
135            tracing::error!(identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
136        }
137
138        let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
139        GLOBAL_ERROR_METRICS.user_compute_error.report([
140            "ExprError".to_owned(),
141            executor_name.to_owned(),
142            self.fragment_id.to_string(),
143        ]);
144    }
145
146    pub fn store_mem_usage(&self, val: usize) {
147        // Record the last mem val.
148        // Calculate the difference between old val and new value, and apply the diff to total
149        // memory usage value.
150        let old_value = self.cur_mem_val.load(Ordering::Relaxed);
151        self.last_mem_val.store(old_value, Ordering::Relaxed);
152        let diff = val as i64 - old_value as i64;
153
154        self.total_mem_val.inc(diff);
155
156        self.cur_mem_val.store(val, Ordering::Relaxed);
157    }
158
159    pub fn mem_usage(&self) -> usize {
160        self.cur_mem_val.load(Ordering::Relaxed)
161    }
162}
163
164/// `Actor` is the basic execution unit in the streaming framework.
165pub struct Actor<C> {
166    /// The [`StreamConsumer`] of the actor.
167    consumer: C,
168    /// The subtasks to execute concurrently.
169    subtasks: Vec<SubtaskHandle>,
170
171    pub actor_context: ActorContextRef,
172    expr_context: ExprContext,
173    barrier_manager: LocalBarrierManager,
174}
175
176impl<C> Actor<C>
177where
178    C: StreamConsumer,
179{
180    pub fn new(
181        consumer: C,
182        subtasks: Vec<SubtaskHandle>,
183        _metrics: Arc<StreamingMetrics>,
184        actor_context: ActorContextRef,
185        expr_context: ExprContext,
186        barrier_manager: LocalBarrierManager,
187    ) -> Self {
188        Self {
189            consumer,
190            subtasks,
191            actor_context,
192            expr_context,
193            barrier_manager,
194        }
195    }
196
197    #[inline(always)]
198    pub async fn run(mut self) -> StreamResult<()> {
199        let expr_context = self.expr_context.clone();
200        let fragment_id = self.actor_context.fragment_id;
201        let vnode_count = self.actor_context.vnode_count;
202
203        let run = async move {
204            tokio::join!(
205                // Drive the subtasks concurrently.
206                join_all(std::mem::take(&mut self.subtasks)),
207                self.run_consumer(),
208            )
209            .1
210        }
211        .boxed();
212
213        // Attach contexts to the future.
214        let run = expr_context_scope(expr_context, run);
215        let run = FRAGMENT_ID::scope(fragment_id, run);
216        let run = VNODE_COUNT::scope(vnode_count, run);
217
218        run.await
219    }
220
221    async fn run_consumer(self) -> StreamResult<()> {
222        fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
223            "intentional start_actors_err"
224        )
225        .into()));
226
227        let id = self.actor_context.id;
228        let span_name = format!("Actor {id}");
229
230        let new_span = |epoch: Option<EpochPair>| {
231            tracing::info_span!(
232                parent: None,
233                "actor",
234                "otel.name" = span_name,
235                actor_id = id,
236                prev_epoch = epoch.map(|e| e.prev),
237                curr_epoch = epoch.map(|e| e.curr),
238            )
239        };
240        let mut span = new_span(None);
241
242        let actor_count = self
243            .actor_context
244            .streaming_metrics
245            .actor_count
246            .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
247        let _actor_count_guard = actor_count.inc_guard();
248
249        let current_epoch = self
250            .actor_context
251            .streaming_metrics
252            .actor_current_epoch
253            .with_guarded_label_values(&[
254                &self.actor_context.id.to_string(),
255                &self.actor_context.fragment_id.to_string(),
256            ]);
257
258        let mut last_epoch: Option<EpochPair> = None;
259        let mut stream = Box::pin(Box::new(self.consumer).execute());
260
261        // Drive the streaming task with an infinite loop
262        let result = loop {
263            let barrier = match stream
264                .try_next()
265                .instrument(span.clone())
266                .instrument_await(
267                    last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
268                        await_tree::span!("Epoch {}", e.curr)
269                    }),
270                )
271                .await
272            {
273                Ok(Some(barrier)) => barrier,
274                Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
275                Err(err) => break Err(err),
276            };
277
278            fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
279                "intentional collect_actors_err"
280            )
281            .into()));
282
283            // Then stop this actor if asked
284            if barrier.is_stop(id) {
285                debug!(actor_id = id, epoch = ?barrier.epoch, "stop at barrier");
286                break Ok(barrier);
287            }
288
289            current_epoch.set(barrier.epoch.curr as i64);
290
291            // Collect barriers to local barrier manager
292            self.barrier_manager.collect(id, &barrier);
293
294            // Tracing related work
295            last_epoch = Some(barrier.epoch);
296            span = barrier.tracing_context().attach(new_span(last_epoch));
297        };
298
299        spawn_blocking_drop_stream(stream).await;
300
301        let result = result.map(|stop_barrier| {
302            // Collect the stop barrier after the stream has been dropped to ensure that all resources
303            self.barrier_manager.collect(id, &stop_barrier);
304        });
305
306        tracing::debug!(actor_id = id, ok = result.is_ok(), "actor exit");
307        result
308    }
309}
310
311/// Drop the stream in a blocking task to avoid interfering with other actors.
312///
313/// Logically the actor is dropped after we send the barrier with `Drop` mutation to the
314/// downstream, thus making the `drop`'s progress asynchronous. However, there might be a
315/// considerable amount of data in the executors' in-memory cache, dropping these structures might
316/// be a CPU-intensive task. This may lead to the runtime being unable to schedule other actors if
317/// the `drop` is called on the current thread.
318pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
319    let _ = tokio::task::spawn_blocking(move || drop(stream))
320        .instrument_await("drop_stream")
321        .await;
322}