risingwave_stream/executor/
actor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::log::LogSuppressor;
28use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::plan_common::ExprContext;
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
36use risingwave_rpc_client::MetaClient;
37use thiserror_ext::AsReport;
38use tokio_stream::StreamExt;
39use tracing::Instrument;
40
41use super::StreamConsumer;
42use super::monitor::StreamingMetrics;
43use super::subtask::SubtaskHandle;
44use crate::CONFIG;
45use crate::error::StreamResult;
46use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
47
48/// Shared by all operators of an actor.
49pub struct ActorContext {
50    pub id: ActorId,
51    pub fragment_id: FragmentId,
52    pub vnode_count: usize,
53    pub mview_definition: String,
54
55    // TODO(eric): these seem to be useless now?
56    last_mem_val: Arc<AtomicUsize>,
57    cur_mem_val: Arc<AtomicUsize>,
58    total_mem_val: Arc<TrAdder<i64>>,
59
60    pub streaming_metrics: Arc<StreamingMetrics>,
61
62    /// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added.
63    pub initial_dispatch_num: usize,
64    // mv_table_id to subscription id
65    pub initial_subscriber_ids: HashSet<SubscriberId>,
66    pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
67
68    // Meta client. currently used for auto schema change. `None` for test only
69    pub meta_client: Option<MetaClient>,
70
71    /// The local streaming configuration for this specific actor.
72    ///
73    /// Compared to `stream_env.global_config`, this config can have some entries overridden by the user.
74    pub config: Arc<StreamingConfig>,
75
76    pub stream_env: StreamEnvironment,
77}
78
79pub type ActorContextRef = Arc<ActorContext>;
80
81impl ActorContext {
82    pub fn for_test(id: impl Into<ActorId>) -> ActorContextRef {
83        Self::for_test_with_config(id, StreamingConfig::default())
84    }
85
86    pub fn for_test_with_config(
87        id: impl Into<ActorId>,
88        config: StreamingConfig,
89    ) -> ActorContextRef {
90        Arc::new(Self {
91            id: id.into(),
92            fragment_id: 0.into(),
93            vnode_count: VirtualNode::COUNT_FOR_TEST,
94            mview_definition: "".to_owned(),
95            cur_mem_val: Arc::new(0.into()),
96            last_mem_val: Arc::new(0.into()),
97            total_mem_val: Arc::new(TrAdder::new()),
98            streaming_metrics: Arc::new(StreamingMetrics::unused()),
99            // Set 1 for test to enable sanity check on table
100            initial_dispatch_num: 1,
101            initial_subscriber_ids: Default::default(),
102            initial_upstream_actors: Default::default(),
103            meta_client: None,
104            config: Arc::new(config),
105            stream_env: StreamEnvironment::for_test(),
106        })
107    }
108
109    #[allow(clippy::too_many_arguments)]
110    pub fn create(
111        stream_actor: &BuildActorInfo,
112        fragment_id: FragmentId,
113        total_mem_val: Arc<TrAdder<i64>>,
114        streaming_metrics: Arc<StreamingMetrics>,
115        meta_client: Option<MetaClient>,
116        config: Arc<StreamingConfig>,
117        stream_env: StreamEnvironment,
118    ) -> ActorContextRef {
119        Arc::new(Self {
120            id: stream_actor.actor_id,
121            fragment_id,
122            mview_definition: stream_actor.mview_definition.clone(),
123            vnode_count: (stream_actor.vnode_bitmap.as_ref())
124                // An unset `vnode_bitmap` means the actor is a singleton,
125                // where only `SINGLETON_VNODE` is set.
126                .map_or(1, |b| Bitmap::from(b).len()),
127            cur_mem_val: Arc::new(0.into()),
128            last_mem_val: Arc::new(0.into()),
129            total_mem_val,
130            streaming_metrics,
131            initial_dispatch_num: stream_actor.dispatchers.len(),
132            initial_subscriber_ids: stream_actor
133                .initial_subscriber_ids
134                .iter()
135                .copied()
136                .collect(),
137            initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
138            meta_client,
139            config,
140            stream_env,
141        })
142    }
143
144    pub fn on_compute_error(&self, err: ExprError, identity: &str) {
145        static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
146        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
147            tracing::error!(target: "stream_expr_error", identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
148        }
149
150        let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
151        GLOBAL_ERROR_METRICS.user_compute_error.report([
152            "ExprError".to_owned(),
153            executor_name.to_owned(),
154            self.fragment_id.to_string(),
155        ]);
156    }
157
158    pub fn store_mem_usage(&self, val: usize) {
159        // Record the last mem val.
160        // Calculate the difference between old val and new value, and apply the diff to total
161        // memory usage value.
162        let old_value = self.cur_mem_val.load(Ordering::Relaxed);
163        self.last_mem_val.store(old_value, Ordering::Relaxed);
164        let diff = val as i64 - old_value as i64;
165
166        self.total_mem_val.inc(diff);
167
168        self.cur_mem_val.store(val, Ordering::Relaxed);
169    }
170
171    pub fn mem_usage(&self) -> usize {
172        self.cur_mem_val.load(Ordering::Relaxed)
173    }
174}
175
176/// `Actor` is the basic execution unit in the streaming framework.
177pub struct Actor<C> {
178    /// The [`StreamConsumer`] of the actor.
179    consumer: C,
180    /// The subtasks to execute concurrently.
181    subtasks: Vec<SubtaskHandle>,
182
183    pub actor_context: ActorContextRef,
184    expr_context: ExprContext,
185    barrier_manager: LocalBarrierManager,
186}
187
188impl<C> Actor<C>
189where
190    C: StreamConsumer,
191{
192    pub fn new(
193        consumer: C,
194        subtasks: Vec<SubtaskHandle>,
195        _metrics: Arc<StreamingMetrics>,
196        actor_context: ActorContextRef,
197        expr_context: ExprContext,
198        barrier_manager: LocalBarrierManager,
199    ) -> Self {
200        Self {
201            consumer,
202            subtasks,
203            actor_context,
204            expr_context,
205            barrier_manager,
206        }
207    }
208
209    #[inline(always)]
210    pub async fn run(mut self) -> StreamResult<()> {
211        let expr_context = self.expr_context.clone();
212        let fragment_id = self.actor_context.fragment_id;
213        let vnode_count = self.actor_context.vnode_count;
214        let config = self.actor_context.config.clone();
215
216        let run = async move {
217            tokio::join!(
218                // Drive the subtasks concurrently.
219                join_all(std::mem::take(&mut self.subtasks)),
220                self.run_consumer(),
221            )
222            .1
223        }
224        .boxed();
225
226        // Attach contexts to the future.
227        let run = expr_context_scope(expr_context, run);
228        let run = FRAGMENT_ID::scope(fragment_id, run);
229        let run = VNODE_COUNT::scope(vnode_count, run);
230        let run = CONFIG.scope(config, run);
231
232        run.await
233    }
234
235    async fn run_consumer(self) -> StreamResult<()> {
236        fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
237            "intentional start_actors_err"
238        )
239        .into()));
240
241        let id = self.actor_context.id;
242        let span_name = format!("Actor {id}");
243
244        let new_span = |epoch: Option<EpochPair>| {
245            tracing::info_span!(
246                parent: None,
247                "actor",
248                "otel.name" = span_name,
249                actor_id = %id,
250                prev_epoch = epoch.map(|e| e.prev),
251                curr_epoch = epoch.map(|e| e.curr),
252            )
253        };
254        let mut span = new_span(None);
255
256        let actor_count = self
257            .actor_context
258            .streaming_metrics
259            .actor_count
260            .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
261        let _actor_count_guard = actor_count.inc_guard();
262
263        let current_epoch = self
264            .actor_context
265            .streaming_metrics
266            .actor_current_epoch
267            .with_guarded_label_values(&[
268                &self.actor_context.id.to_string(),
269                &self.actor_context.fragment_id.to_string(),
270            ]);
271
272        let mut last_epoch: Option<EpochPair> = None;
273        let mut stream = Box::pin(Box::new(self.consumer).execute());
274
275        // Drive the streaming task with an infinite loop
276        let result = loop {
277            let barrier = match stream
278                .try_next()
279                .instrument(span.clone())
280                .instrument_await(
281                    last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
282                        await_tree::span!("Epoch {}", e.curr)
283                    }),
284                )
285                .await
286            {
287                Ok(Some(barrier)) => barrier,
288                Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
289                Err(err) => break Err(err),
290            };
291
292            fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
293                "intentional collect_actors_err"
294            )
295            .into()));
296
297            // Then stop this actor if asked
298            if barrier.is_stop(id) {
299                debug!(actor_id = %id, epoch = ?barrier.epoch, "stop at barrier");
300                break Ok(barrier);
301            }
302
303            current_epoch.set(barrier.epoch.curr as i64);
304
305            // Collect barriers to local barrier manager
306            self.barrier_manager.collect(id, &barrier);
307
308            // Tracing related work
309            last_epoch = Some(barrier.epoch);
310            span = barrier.tracing_context().attach(new_span(last_epoch));
311        };
312
313        spawn_blocking_drop_stream(stream).await;
314
315        let result = result.map(|stop_barrier| {
316            // Collect the stop barrier after the stream has been dropped to ensure that all resources
317            self.barrier_manager.collect(id, &stop_barrier);
318        });
319
320        tracing::debug!(actor_id = %id, ok = result.is_ok(), "actor exit");
321        result
322    }
323}
324
325/// Drop the stream in a blocking task to avoid interfering with other actors.
326///
327/// Logically the actor is dropped after we send the barrier with `Drop` mutation to the
328/// downstream, thus making the `drop`'s progress asynchronous. However, there might be a
329/// considerable amount of data in the executors' in-memory cache, dropping these structures might
330/// be a CPU-intensive task. This may lead to the runtime being unable to schedule other actors if
331/// the `drop` is called on the current thread.
332pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
333    let _ = tokio::task::spawn_blocking(move || drop(stream))
334        .instrument_await("drop_stream")
335        .await;
336}