risingwave_stream/executor/
actor.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use chrono_tz::{Tz, UTC};
22use futures::FutureExt;
23use futures::future::join_all;
24use hytra::TrAdder;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::log::LogSuppressor;
29use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
30use risingwave_common::types::Timestamptz;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_expr::ExprError;
33use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
34use risingwave_pb::id::SubscriberId;
35use risingwave_pb::plan_common::ExprContext;
36use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
37use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
38use risingwave_rpc_client::MetaClient;
39use thiserror_ext::AsReport;
40use tokio_stream::StreamExt;
41use tracing::Instrument;
42
43use super::StreamConsumer;
44use super::monitor::StreamingMetrics;
45use super::subtask::SubtaskHandle;
46use crate::CONFIG;
47use crate::error::StreamResult;
48use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
49
50/// Shared by all operators of an actor.
51pub struct ActorContext {
52    pub id: ActorId,
53    pub fragment_id: FragmentId,
54    pub vnode_count: usize,
55    pub mview_definition: String,
56
57    // TODO(eric): these seem to be useless now?
58    last_mem_val: Arc<AtomicUsize>,
59    cur_mem_val: Arc<AtomicUsize>,
60    total_mem_val: Arc<TrAdder<i64>>,
61
62    pub streaming_metrics: Arc<StreamingMetrics>,
63
64    /// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added.
65    pub initial_dispatch_num: usize,
66    // mv_table_id to subscription id
67    pub initial_subscriber_ids: HashSet<SubscriberId>,
68    pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
69
70    // Meta client. currently used for auto schema change. `None` for test only
71    pub meta_client: Option<MetaClient>,
72
73    /// The local streaming configuration for this specific actor.
74    ///
75    /// Compared to `stream_env.global_config`, this config can have some entries overridden by the user.
76    pub config: Arc<StreamingConfig>,
77
78    pub time_zone: Tz,
79
80    pub stream_env: StreamEnvironment,
81}
82
83pub type ActorContextRef = Arc<ActorContext>;
84
85impl ActorContext {
86    pub fn for_test(id: impl Into<ActorId>) -> ActorContextRef {
87        Self::for_test_with_config(id, StreamingConfig::default())
88    }
89
90    pub fn for_test_with_config(
91        id: impl Into<ActorId>,
92        config: StreamingConfig,
93    ) -> ActorContextRef {
94        Arc::new(Self {
95            id: id.into(),
96            fragment_id: 0.into(),
97            vnode_count: VirtualNode::COUNT_FOR_TEST,
98            mview_definition: "".to_owned(),
99            cur_mem_val: Arc::new(0.into()),
100            last_mem_val: Arc::new(0.into()),
101            total_mem_val: Arc::new(TrAdder::new()),
102            streaming_metrics: Arc::new(StreamingMetrics::unused()),
103            // Set 1 for test to enable sanity check on table
104            initial_dispatch_num: 1,
105            initial_subscriber_ids: Default::default(),
106            initial_upstream_actors: Default::default(),
107            meta_client: None,
108            config: Arc::new(config),
109            time_zone: UTC,
110            stream_env: StreamEnvironment::for_test(),
111        })
112    }
113
114    pub fn create(
115        stream_actor: &BuildActorInfo,
116        fragment_id: FragmentId,
117        total_mem_val: Arc<TrAdder<i64>>,
118        streaming_metrics: Arc<StreamingMetrics>,
119        meta_client: Option<MetaClient>,
120        config: Arc<StreamingConfig>,
121        stream_env: StreamEnvironment,
122    ) -> ActorContextRef {
123        let time_zone = &stream_actor.expr_context.as_ref().unwrap().time_zone;
124        let time_zone = match Timestamptz::lookup_time_zone(time_zone) {
125            Ok(time_zone) => time_zone,
126            Err(err) => {
127                tracing::warn!(
128                    time_zone,
129                    error = %err,
130                    "failed to parse actor time zone, falling back to UTC"
131                );
132                UTC
133            }
134        };
135
136        Arc::new(Self {
137            id: stream_actor.actor_id,
138            fragment_id,
139            mview_definition: stream_actor.mview_definition.clone(),
140            vnode_count: (stream_actor.vnode_bitmap.as_ref())
141                // An unset `vnode_bitmap` means the actor is a singleton,
142                // where only `SINGLETON_VNODE` is set.
143                .map_or(1, |b| Bitmap::from(b).len()),
144            cur_mem_val: Arc::new(0.into()),
145            last_mem_val: Arc::new(0.into()),
146            total_mem_val,
147            streaming_metrics,
148            initial_dispatch_num: stream_actor.dispatchers.len(),
149            initial_subscriber_ids: stream_actor
150                .initial_subscriber_ids
151                .iter()
152                .copied()
153                .collect(),
154            initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
155            meta_client,
156            config,
157            time_zone,
158            stream_env,
159        })
160    }
161
162    pub fn on_compute_error(&self, err: ExprError, identity: &str) {
163        static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
164        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
165            tracing::error!(target: "stream_expr_error", identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
166        }
167
168        let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
169        GLOBAL_ERROR_METRICS.user_compute_error.report([
170            "ExprError".to_owned(),
171            executor_name.to_owned(),
172            self.fragment_id.to_string(),
173        ]);
174    }
175
176    pub fn store_mem_usage(&self, val: usize) {
177        // Record the last mem val.
178        // Calculate the difference between old val and new value, and apply the diff to total
179        // memory usage value.
180        let old_value = self.cur_mem_val.load(Ordering::Relaxed);
181        self.last_mem_val.store(old_value, Ordering::Relaxed);
182        let diff = val as i64 - old_value as i64;
183
184        self.total_mem_val.inc(diff);
185
186        self.cur_mem_val.store(val, Ordering::Relaxed);
187    }
188
189    pub fn mem_usage(&self) -> usize {
190        self.cur_mem_val.load(Ordering::Relaxed)
191    }
192}
193
194/// `Actor` is the basic execution unit in the streaming framework.
195pub struct Actor<C> {
196    /// The [`StreamConsumer`] of the actor.
197    consumer: C,
198    /// The subtasks to execute concurrently.
199    subtasks: Vec<SubtaskHandle>,
200
201    pub actor_context: ActorContextRef,
202    expr_context: ExprContext,
203    barrier_manager: LocalBarrierManager,
204}
205
206impl<C> Actor<C>
207where
208    C: StreamConsumer,
209{
210    pub fn new(
211        consumer: C,
212        subtasks: Vec<SubtaskHandle>,
213        _metrics: Arc<StreamingMetrics>,
214        actor_context: ActorContextRef,
215        expr_context: ExprContext,
216        barrier_manager: LocalBarrierManager,
217    ) -> Self {
218        Self {
219            consumer,
220            subtasks,
221            actor_context,
222            expr_context,
223            barrier_manager,
224        }
225    }
226
227    #[inline(always)]
228    pub async fn run(mut self) -> StreamResult<()> {
229        let expr_context = self.expr_context.clone();
230        let fragment_id = self.actor_context.fragment_id;
231        let vnode_count = self.actor_context.vnode_count;
232        let config = self.actor_context.config.clone();
233
234        let run = async move {
235            tokio::join!(
236                // Drive the subtasks concurrently.
237                join_all(std::mem::take(&mut self.subtasks)),
238                self.run_consumer(),
239            )
240            .1
241        }
242        .boxed();
243
244        // Attach contexts to the future.
245        let run = expr_context_scope(expr_context, run);
246        let run = FRAGMENT_ID::scope(fragment_id, run);
247        let run = VNODE_COUNT::scope(vnode_count, run);
248        let run = CONFIG.scope(config, run);
249
250        run.await
251    }
252
253    async fn run_consumer(self) -> StreamResult<()> {
254        fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
255            "intentional start_actors_err"
256        )
257        .into()));
258
259        let id = self.actor_context.id;
260        let span_name = format!("Actor {id}");
261
262        let new_span = |epoch: Option<EpochPair>| {
263            tracing::info_span!(
264                parent: None,
265                "actor",
266                "otel.name" = span_name,
267                actor_id = %id,
268                prev_epoch = epoch.map(|e| e.prev),
269                curr_epoch = epoch.map(|e| e.curr),
270            )
271        };
272        let mut span = new_span(None);
273
274        let actor_count = self
275            .actor_context
276            .streaming_metrics
277            .actor_count
278            .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
279        let _actor_count_guard = actor_count.inc_guard();
280
281        let current_epoch = self
282            .actor_context
283            .streaming_metrics
284            .actor_current_epoch
285            .with_guarded_label_values(&[
286                &self.actor_context.id.to_string(),
287                &self.actor_context.fragment_id.to_string(),
288            ]);
289
290        let mut last_epoch: Option<EpochPair> = None;
291        let mut stream = Box::pin(Box::new(self.consumer).execute());
292
293        // Drive the streaming task with an infinite loop
294        let result = loop {
295            let barrier = match stream
296                .try_next()
297                .instrument(span.clone())
298                .instrument_await(
299                    last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
300                        await_tree::span!("Epoch {}", e.curr)
301                    }),
302                )
303                .await
304            {
305                Ok(Some(barrier)) => barrier,
306                Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
307                Err(err) => break Err(err),
308            };
309
310            fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
311                "intentional collect_actors_err"
312            )
313            .into()));
314
315            // Then stop this actor if asked
316            if barrier.is_stop(id) {
317                debug!(actor_id = %id, epoch = ?barrier.epoch, "stop at barrier");
318                break Ok(barrier);
319            }
320
321            current_epoch.set(barrier.epoch.curr as i64);
322
323            // Collect barriers to local barrier manager
324            self.barrier_manager.collect(id, &barrier);
325
326            // Tracing related work
327            last_epoch = Some(barrier.epoch);
328            span = barrier.tracing_context().attach(new_span(last_epoch));
329        };
330
331        spawn_blocking_drop_stream(stream).await;
332
333        let result = result.map(|stop_barrier| {
334            // Collect the stop barrier after the stream has been dropped to ensure that all resources
335            self.barrier_manager.collect(id, &stop_barrier);
336        });
337
338        tracing::debug!(actor_id = %id, ok = result.is_ok(), "actor exit");
339        result
340    }
341}
342
343/// Drop the stream in a blocking task to avoid interfering with other actors.
344///
345/// Logically the actor is dropped after we send the barrier with `Drop` mutation to the
346/// downstream, thus making the `drop`'s progress asynchronous. However, there might be a
347/// considerable amount of data in the executors' in-memory cache, dropping these structures might
348/// be a CPU-intensive task. This may lead to the runtime being unable to schedule other actors if
349/// the `drop` is called on the current thread.
350pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
351    let _ = tokio::task::spawn_blocking(move || drop(stream))
352        .instrument_await("drop_stream")
353        .await;
354}