risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::error::Error;
17use std::fmt::{Debug, Formatter};
18use std::future::poll_fn;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22use anyhow::anyhow;
23use fail::fail_point;
24use futures::StreamExt;
25use futures::future::join_all;
26use itertools::Itertools;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::util::epoch::Epoch;
29use risingwave_common::util::tracing::TracingContext;
30use risingwave_connector::source::SplitImpl;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::common::{ActorInfo, WorkerNode};
33use risingwave_pb::hummock::HummockVersionStats;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
36use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
37use risingwave_pb::stream_service::inject_barrier_request::{
38    BuildActorInfo, FragmentBuildActorInfo,
39};
40use risingwave_pb::stream_service::streaming_control_stream_request::{
41    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
42    RemovePartialGraphRequest, ResetDatabaseRequest,
43};
44use risingwave_pb::stream_service::{
45    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
46    streaming_control_stream_request, streaming_control_stream_response,
47};
48use risingwave_rpc_client::StreamingControlHandle;
49use thiserror_ext::AsReport;
50use tokio::time::sleep;
51use tokio_retry::strategy::ExponentialBackoff;
52use tracing::{debug, error, info, warn};
53use uuid::Uuid;
54
55use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
56use crate::barrier::checkpoint::{BarrierWorkerState, DatabaseCheckpointControl};
57use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
60use crate::barrier::progress::CreateMviewProgressTracker;
61use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
62use crate::controller::fragment::InflightFragmentInfo;
63use crate::manager::MetaSrvEnv;
64use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
65use crate::stream::build_actor_connector_splits;
66use crate::{MetaError, MetaResult};
67
68fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
69    job_id
70        .map(|table| {
71            assert_ne!(table.table_id, u32::MAX);
72            table.table_id
73        })
74        .unwrap_or(u32::MAX)
75}
76
77pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
78    if partial_graph_id == u32::MAX {
79        None
80    } else {
81        Some(TableId::new(partial_graph_id))
82    }
83}
84
85struct ControlStreamNode {
86    worker: WorkerNode,
87    handle: StreamingControlHandle,
88}
89
90pub(super) struct ControlStreamManager {
91    nodes: HashMap<WorkerId, ControlStreamNode>,
92    pub(crate) env: MetaSrvEnv,
93}
94
95impl ControlStreamManager {
96    pub(super) fn new(env: MetaSrvEnv) -> Self {
97        Self {
98            nodes: Default::default(),
99            env,
100        }
101    }
102
103    pub(super) fn contains_worker(&self, worker_id: WorkerId) -> bool {
104        self.nodes.contains_key(&worker_id)
105    }
106
107    pub(super) async fn try_add_worker(
108        &mut self,
109        node: WorkerNode,
110        inflight_infos: impl Iterator<
111            Item = (
112                DatabaseId,
113                &InflightSubscriptionInfo,
114                &InflightDatabaseInfo,
115                impl Iterator<Item = &InflightStreamingJobInfo>,
116            ),
117        >,
118        term_id: String,
119        context: &impl GlobalBarrierWorkerContext,
120    ) {
121        let node_id = node.id as WorkerId;
122        if self.nodes.contains_key(&node_id) {
123            warn!(id = node.id, host = ?node.host, "node already exists");
124            return;
125        }
126        let node_host = node.host.clone().unwrap();
127
128        let init_request = self.collect_init_request(inflight_infos, term_id);
129        match context.new_control_stream(&node, &init_request).await {
130            Ok(handle) => {
131                assert!(
132                    self.nodes
133                        .insert(
134                            node_id,
135                            ControlStreamNode {
136                                worker: node.clone(),
137                                handle,
138                            }
139                        )
140                        .is_none()
141                );
142                info!(?node_host, "add control stream worker");
143            }
144            Err(e) => {
145                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
146            }
147        }
148    }
149
150    pub(super) async fn add_worker(
151        &mut self,
152        node: WorkerNode,
153        inflight_infos: impl Iterator<
154            Item = (
155                DatabaseId,
156                &InflightSubscriptionInfo,
157                &InflightDatabaseInfo,
158                impl Iterator<Item = &InflightStreamingJobInfo>,
159            ),
160        >,
161        term_id: String,
162        context: &impl GlobalBarrierWorkerContext,
163    ) {
164        let node_id = node.id as WorkerId;
165        if self.nodes.contains_key(&node_id) {
166            warn!(id = node.id, host = ?node.host, "node already exists");
167            return;
168        }
169        let node_host = node.host.clone().unwrap();
170        let mut backoff = ExponentialBackoff::from_millis(100)
171            .max_delay(Duration::from_secs(3))
172            .factor(5);
173        let init_request = self.collect_init_request(inflight_infos, term_id);
174        const MAX_RETRY: usize = 5;
175        for i in 1..=MAX_RETRY {
176            match context.new_control_stream(&node, &init_request).await {
177                Ok(handle) => {
178                    assert!(
179                        self.nodes
180                            .insert(
181                                node_id,
182                                ControlStreamNode {
183                                    worker: node.clone(),
184                                    handle,
185                                }
186                            )
187                            .is_none()
188                    );
189                    info!(?node_host, "add control stream worker");
190                    return;
191                }
192                Err(e) => {
193                    // It may happen that the dns information of newly registered worker node
194                    // has not been propagated to the meta node and cause error. Wait for a while and retry
195                    let delay = backoff.next().unwrap();
196                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
197                    sleep(delay).await;
198                }
199            }
200        }
201        error!(?node_host, "fail to create worker node after retry");
202    }
203
204    pub(super) async fn reset(
205        &mut self,
206        nodes: &HashMap<WorkerId, WorkerNode>,
207        term_id: String,
208        context: &impl GlobalBarrierWorkerContext,
209    ) -> HashSet<WorkerId> {
210        let init_request = PbInitRequest {
211            databases: vec![],
212            term_id,
213        };
214        let init_request = &init_request;
215        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
216            let result = context.new_control_stream(node, init_request).await;
217            (*worker_id, node.clone(), result)
218        }))
219        .await;
220        self.nodes.clear();
221        let mut failed_workers = HashSet::new();
222        for (worker_id, node, result) in nodes {
223            match result {
224                Ok(handle) => {
225                    assert!(
226                        self.nodes
227                            .insert(
228                                worker_id,
229                                ControlStreamNode {
230                                    worker: node,
231                                    handle
232                                }
233                            )
234                            .is_none()
235                    );
236                }
237                Err(e) => {
238                    failed_workers.insert(worker_id);
239                    warn!(
240                        e = %e.as_report(),
241                        worker_id,
242                        ?node,
243                        "failed to connect to node"
244                    )
245                }
246            }
247        }
248
249        failed_workers
250    }
251
252    /// Clear all nodes and response streams in the manager.
253    pub(super) fn clear(&mut self) {
254        *self = Self::new(self.env.clone());
255    }
256
257    fn poll_next_response(
258        &mut self,
259        cx: &mut Context<'_>,
260    ) -> Poll<(
261        WorkerId,
262        MetaResult<streaming_control_stream_response::Response>,
263    )> {
264        if self.nodes.is_empty() {
265            return Poll::Pending;
266        }
267        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
268        {
269            for (worker_id, node) in &mut self.nodes {
270                match node.handle.response_stream.poll_next_unpin(cx) {
271                    Poll::Ready(result) => {
272                        poll_result = Poll::Ready((
273                            *worker_id,
274                            result
275                                .ok_or_else(|| anyhow!("end of stream").into())
276                                .and_then(|result| {
277                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
278                                        match resp
279                                            .response
280                                            .ok_or_else(||anyhow!("empty response"))?
281                                        {
282                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
283                                                "worker node {worker_id} is shutting down"
284                                            )
285                                            .into()),
286                                            streaming_control_stream_response::Response::Init(_) => {
287                                                // This arm should be unreachable.
288                                                Err(anyhow!("get unexpected init response").into())
289                                            }
290                                            resp => Ok(resp),
291                                        }
292                                    })
293                                })
294                        ));
295                        break;
296                    }
297                    Poll::Pending => {
298                        continue;
299                    }
300                }
301            }
302        };
303
304        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
305            let node = self
306                .nodes
307                .remove(worker_id)
308                .expect("should exist when get shutdown resp");
309            warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
310        }
311
312        poll_result
313    }
314
315    pub(super) async fn next_response(
316        &mut self,
317    ) -> (
318        WorkerId,
319        MetaResult<streaming_control_stream_response::Response>,
320    ) {
321        poll_fn(|cx| self.poll_next_response(cx)).await
322    }
323
324    fn collect_init_request(
325        &self,
326        initial_inflight_infos: impl Iterator<
327            Item = (
328                DatabaseId,
329                &InflightSubscriptionInfo,
330                &InflightDatabaseInfo,
331                impl Iterator<Item = &InflightStreamingJobInfo>,
332            ),
333        >,
334        term_id: String,
335    ) -> PbInitRequest {
336        PbInitRequest {
337            databases: initial_inflight_infos
338                .map(
339                    |(
340                        database_id,
341                        subscriptions,
342                        database_inflight_info,
343                        creating_job_inflight_info,
344                    )| {
345                        fn actor_infos(
346                            nodes: &HashMap<WorkerId, ControlStreamNode>,
347                            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
348                        ) -> Vec<ActorInfo> {
349                            {
350                                fragment_infos
351                                    .flat_map(|fragment| {
352                                        fragment.actors.iter().map(|(actor_id, actor)| {
353                                            let host_addr = nodes
354                                                .get(&actor.worker_id)
355                                                .expect("worker should exist for inflight actor")
356                                                .worker
357                                                .host
358                                                .clone()
359                                                .expect("should exist");
360                                            ActorInfo {
361                                                actor_id: *actor_id,
362                                                host: Some(host_addr),
363                                            }
364                                        })
365                                    })
366                                    .collect()
367                            }
368                        }
369                        let mut graphs = vec![PbInitialPartialGraph {
370                            partial_graph_id: to_partial_graph_id(None),
371                            subscriptions: subscriptions.into_iter().collect_vec(),
372                            actor_infos: actor_infos(
373                                &self.nodes,
374                                database_inflight_info.fragment_infos(),
375                            ),
376                        }];
377                        graphs.extend(creating_job_inflight_info.map(|job| {
378                            PbInitialPartialGraph {
379                                partial_graph_id: to_partial_graph_id(Some(job.job_id)),
380                                subscriptions: vec![],
381                                actor_infos: actor_infos(&self.nodes, job.fragment_infos()),
382                            }
383                        }));
384                        PbDatabaseInitialPartialGraph {
385                            database_id: database_id.database_id,
386                            graphs,
387                        }
388                    },
389                )
390                .collect(),
391            term_id,
392        }
393    }
394}
395
396pub(super) struct DatabaseInitialBarrierCollector {
397    database_id: DatabaseId,
398    node_to_collect: NodeToCollect,
399    database_state: BarrierWorkerState,
400    create_mview_tracker: CreateMviewProgressTracker,
401    committed_epoch: u64,
402}
403
404impl Debug for DatabaseInitialBarrierCollector {
405    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
406        f.debug_struct("DatabaseInitialBarrierCollector")
407            .field("database_id", &self.database_id)
408            .field("node_to_collect", &self.node_to_collect)
409            .finish()
410    }
411}
412
413impl DatabaseInitialBarrierCollector {
414    pub(super) fn is_collected(&self) -> bool {
415        self.node_to_collect.is_empty()
416    }
417
418    pub(super) fn database_state(&self) -> &BarrierWorkerState {
419        &self.database_state
420    }
421
422    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
423        assert_eq!(self.database_id.database_id, resp.database_id);
424        assert_eq!(resp.epoch, self.committed_epoch);
425        assert!(
426            self.node_to_collect
427                .remove(&(resp.worker_id as _))
428                .is_some()
429        );
430    }
431
432    pub(super) fn finish(self) -> DatabaseCheckpointControl {
433        assert!(self.is_collected());
434        DatabaseCheckpointControl::recovery(
435            self.database_id,
436            self.create_mview_tracker,
437            self.database_state,
438            self.committed_epoch,
439        )
440    }
441
442    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
443        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
444    }
445}
446
447impl ControlStreamManager {
448    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
449    #[expect(clippy::too_many_arguments)]
450    pub(super) fn inject_database_initial_barrier(
451        &mut self,
452        database_id: DatabaseId,
453        info: InflightDatabaseInfo,
454        state_table_committed_epochs: &mut HashMap<TableId, u64>,
455        edges: &mut FragmentEdgeBuildResult,
456        stream_actors: &HashMap<ActorId, StreamActor>,
457        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
458        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
459        subscription_info: InflightSubscriptionInfo,
460        is_paused: bool,
461        hummock_version_stats: &HummockVersionStats,
462    ) -> MetaResult<DatabaseInitialBarrierCollector> {
463        self.add_partial_graph(database_id, None);
464        let source_split_assignments = info
465            .fragment_infos()
466            .flat_map(|info| info.actors.keys())
467            .filter_map(|actor_id| {
468                let actor_id = *actor_id as ActorId;
469                source_splits
470                    .remove(&actor_id)
471                    .map(|splits| (actor_id, splits))
472            })
473            .collect();
474        let mutation = Mutation::Add(AddMutation {
475            // Actors built during recovery is not treated as newly added actors.
476            actor_dispatchers: Default::default(),
477            added_actors: Default::default(),
478            actor_splits: build_actor_connector_splits(&source_split_assignments),
479            pause: is_paused,
480            subscriptions_to_add: Default::default(),
481        });
482
483        let mut epochs = info.existing_table_ids().map(|table_id| {
484            (
485                table_id,
486                state_table_committed_epochs
487                    .remove(&table_id)
488                    .expect("should exist"),
489            )
490        });
491        let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
492        for (table_id, epoch) in epochs {
493            assert_eq!(
494                prev_epoch, epoch,
495                "{} has different committed epoch to {}",
496                first_table_id, table_id
497            );
498        }
499        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
500        // Use a different `curr_epoch` for each recovery attempt.
501        let curr_epoch = prev_epoch.next();
502        let barrier_info = BarrierInfo {
503            prev_epoch,
504            curr_epoch,
505            kind: BarrierKind::Initial,
506        };
507
508        let node_actors =
509            edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
510                (
511                    fragment_info.fragment_id,
512                    &fragment_info.nodes,
513                    fragment_info.actors.iter().map(move |(actor_id, actor)| {
514                        (
515                            stream_actors.get(actor_id).expect("should exist"),
516                            actor.worker_id,
517                        )
518                    }),
519                )
520            }));
521
522        let background_mviews = info.job_ids().filter_map(|job_id| {
523            background_jobs
524                .get(&job_id)
525                .map(|(definition, stream_job_fragments)| {
526                    (job_id, (definition.clone(), stream_job_fragments))
527                })
528        });
529        let tracker = CreateMviewProgressTracker::recover(background_mviews, hummock_version_stats);
530
531        let node_to_collect = self.inject_barrier(
532            database_id,
533            None,
534            Some(mutation),
535            &barrier_info,
536            info.fragment_infos(),
537            info.fragment_infos(),
538            Some(node_actors),
539            (&subscription_info).into_iter().collect(),
540            vec![],
541        )?;
542        debug!(
543            ?node_to_collect,
544            database_id = database_id.database_id,
545            "inject initial barrier"
546        );
547
548        let committed_epoch = barrier_info.prev_epoch();
549        let new_epoch = barrier_info.curr_epoch;
550        let database_state =
551            BarrierWorkerState::recovery(new_epoch, info, subscription_info, is_paused);
552        Ok(DatabaseInitialBarrierCollector {
553            database_id,
554            node_to_collect,
555            database_state,
556            create_mview_tracker: tracker,
557            committed_epoch,
558        })
559    }
560
561    pub(super) fn inject_command_ctx_barrier(
562        &mut self,
563        database_id: DatabaseId,
564        command: Option<&Command>,
565        barrier_info: &BarrierInfo,
566        is_paused: bool,
567        pre_applied_graph_info: &InflightDatabaseInfo,
568        applied_graph_info: &InflightDatabaseInfo,
569        edges: &mut Option<FragmentEdgeBuildResult>,
570    ) -> MetaResult<NodeToCollect> {
571        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges));
572        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
573            add.subscriptions_to_add.clone()
574        } else {
575            vec![]
576        };
577        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
578            drop.info.clone()
579        } else {
580            vec![]
581        };
582        self.inject_barrier(
583            database_id,
584            None,
585            mutation,
586            barrier_info,
587            pre_applied_graph_info.fragment_infos(),
588            applied_graph_info.fragment_infos(),
589            command
590                .as_ref()
591                .map(|command| command.actors_to_create(pre_applied_graph_info, edges))
592                .unwrap_or_default(),
593            subscriptions_to_add,
594            subscriptions_to_remove,
595        )
596    }
597
598    pub(super) fn inject_barrier<'a>(
599        &mut self,
600        database_id: DatabaseId,
601        creating_table_id: Option<TableId>,
602        mutation: Option<Mutation>,
603        barrier_info: &BarrierInfo,
604        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
605        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
606        mut new_actors: Option<StreamJobActorsToCreate>,
607        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
608        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
609    ) -> MetaResult<NodeToCollect> {
610        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
611            "inject_barrier_err"
612        ));
613
614        let partial_graph_id = to_partial_graph_id(creating_table_id);
615
616        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
617
618        for worker_id in node_actors.keys() {
619            if !self.nodes.contains_key(worker_id) {
620                return Err(anyhow!("unconnected worker node {}", worker_id).into());
621            }
622        }
623
624        let table_ids_to_sync: HashSet<_> =
625            InflightFragmentInfo::existing_table_ids(applied_graph_info)
626                .map(|table_id| table_id.table_id)
627                .collect();
628
629        let mut node_need_collect = HashMap::new();
630        let new_actors_location_to_broadcast = new_actors
631            .iter()
632            .flatten()
633            .flat_map(|(worker_id, actor_infos)| {
634                actor_infos
635                    .iter()
636                    .flat_map(|(_, (_, actors))| actors.iter())
637                    .map(|actor_info| ActorInfo {
638                        actor_id: actor_info.0.actor_id,
639                        host: self
640                            .nodes
641                            .get(worker_id)
642                            .expect("have checked exist previously")
643                            .worker
644                            .host
645                            .clone(),
646                    })
647            })
648            .collect_vec();
649
650        self.nodes
651            .iter()
652            .try_for_each(|(node_id, node)| {
653                let actor_ids_to_collect = node_actors
654                    .get(node_id)
655                    .map(|actors| actors.iter().cloned())
656                    .into_iter()
657                    .flatten()
658                    .collect_vec();
659                let is_empty = actor_ids_to_collect.is_empty();
660                {
661                    let mutation = mutation.clone();
662                    let barrier = Barrier {
663                        epoch: Some(risingwave_pb::data::Epoch {
664                            curr: barrier_info.curr_epoch.value().0,
665                            prev: barrier_info.prev_epoch(),
666                        }),
667                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
668                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
669                            .to_protobuf(),
670                        kind: barrier_info.kind.to_protobuf() as i32,
671                        passed_actors: vec![],
672                    };
673
674                    node.handle
675                        .request_sender
676                        .send(StreamingControlStreamRequest {
677                            request: Some(
678                                streaming_control_stream_request::Request::InjectBarrier(
679                                    InjectBarrierRequest {
680                                        request_id: Uuid::new_v4().to_string(),
681                                        barrier: Some(barrier),
682                                        database_id: database_id.database_id,
683                                        actor_ids_to_collect,
684                                        table_ids_to_sync: table_ids_to_sync
685                                            .iter()
686                                            .cloned()
687                                            .collect(),
688                                        partial_graph_id,
689                                        broadcast_info: new_actors_location_to_broadcast.clone(),
690                                        actors_to_build: new_actors
691                                            .as_mut()
692                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
693                                            .into_iter()
694                                            .flatten()
695                                            .flatten()
696                                            .map(|(fragment_id, (node, actors))| {
697                                                FragmentBuildActorInfo {
698                                                    fragment_id,
699                                                    node: Some(node),
700                                                    actors: actors
701                                                        .into_iter()
702                                                        .map(|(actor, upstreams, dispatchers)| {
703                                                            BuildActorInfo {
704                                                                actor_id: actor.actor_id,
705                                                                fragment_upstreams: upstreams
706                                                                    .into_iter()
707                                                                    .map(|(fragment_id, upstreams)| {
708                                                                        (
709                                                                            fragment_id,
710                                                                            UpstreamActors {
711                                                                                actors: upstreams
712                                                                                    .into_iter()
713                                                                                    .collect(),
714                                                                            },
715                                                                        )
716                                                                    })
717                                                                    .collect(),
718                                                                dispatchers,
719                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
720                                                                mview_definition: actor.mview_definition,
721                                                                expr_context: actor.expr_context,
722                                                            }
723                                                        })
724                                                        .collect(),
725                                                }
726                                            })
727                                            .collect(),
728                                        subscriptions_to_add: subscriptions_to_add.clone(),
729                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
730                                    },
731                                ),
732                            ),
733                        })
734                        .map_err(|_| {
735                            MetaError::from(anyhow!(
736                                "failed to send request to {} {:?}",
737                                node.worker.id,
738                                node.worker.host
739                            ))
740                        })?;
741
742                    node_need_collect.insert(*node_id as WorkerId, is_empty);
743                    Result::<_, MetaError>::Ok(())
744                }
745            })
746            .inspect_err(|e| {
747                // Record failure in event log.
748                use risingwave_pb::meta::event_log;
749                let event = event_log::EventInjectBarrierFail {
750                    prev_epoch: barrier_info.prev_epoch(),
751                    cur_epoch: barrier_info.curr_epoch.value().0,
752                    error: e.to_report_string(),
753                };
754                self.env
755                    .event_log_manager_ref()
756                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
757            })?;
758        Ok(node_need_collect)
759    }
760
761    pub(super) fn add_partial_graph(
762        &mut self,
763        database_id: DatabaseId,
764        creating_job_id: Option<TableId>,
765    ) {
766        let partial_graph_id = to_partial_graph_id(creating_job_id);
767        self.nodes.iter().for_each(|(_, node)| {
768            if node
769                .handle
770                .request_sender
771                .send(StreamingControlStreamRequest {
772                    request: Some(
773                        streaming_control_stream_request::Request::CreatePartialGraph(
774                            CreatePartialGraphRequest {
775                                database_id: database_id.database_id,
776                                partial_graph_id,
777                            },
778                        ),
779                    ),
780                }).is_err() {
781                warn!(%database_id, ?creating_job_id, worker_id = node.worker.id, "fail to add partial graph to worker")
782            }
783        });
784    }
785
786    pub(super) fn remove_partial_graph(
787        &mut self,
788        database_id: DatabaseId,
789        creating_job_ids: Vec<TableId>,
790    ) {
791        if creating_job_ids.is_empty() {
792            return;
793        }
794        let partial_graph_ids = creating_job_ids
795            .into_iter()
796            .map(|job_id| to_partial_graph_id(Some(job_id)))
797            .collect_vec();
798        self.nodes.iter().for_each(|(_, node)| {
799            if node.handle
800                .request_sender
801                .send(StreamingControlStreamRequest {
802                    request: Some(
803                        streaming_control_stream_request::Request::RemovePartialGraph(
804                            RemovePartialGraphRequest {
805                                partial_graph_ids: partial_graph_ids.clone(),
806                                database_id: database_id.database_id,
807                            },
808                        ),
809                    ),
810                })
811                .is_err()
812            {
813                warn!(worker_id = node.worker.id,node = ?node.worker.host,"failed to send remove partial graph request");
814            }
815        })
816    }
817
818    pub(super) fn reset_database(
819        &mut self,
820        database_id: DatabaseId,
821        reset_request_id: u32,
822    ) -> HashSet<WorkerId> {
823        self.nodes.iter().filter_map(|(worker_id, node)| {
824            if node.handle
825                .request_sender
826                .send(StreamingControlStreamRequest {
827                    request: Some(
828                        streaming_control_stream_request::Request::ResetDatabase(
829                            ResetDatabaseRequest {
830                                database_id: database_id.database_id,
831                                reset_request_id,
832                            },
833                        ),
834                    ),
835                })
836                .is_err()
837            {
838                warn!(worker_id, node = ?node.worker.host,"failed to send reset database request");
839                None
840            } else {
841                Some(*worker_id)
842            }
843        }).collect()
844    }
845}
846
847impl GlobalBarrierWorkerContextImpl {
848    pub(super) async fn new_control_stream_impl(
849        &self,
850        node: &WorkerNode,
851        init_request: &PbInitRequest,
852    ) -> MetaResult<StreamingControlHandle> {
853        let handle = self
854            .env
855            .stream_client_pool()
856            .get(node)
857            .await?
858            .start_streaming_control(init_request.clone())
859            .await?;
860        Ok(handle)
861    }
862}
863
864pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
865    message: &str,
866    errors: impl IntoIterator<Item = (WorkerId, E)>,
867) -> MetaError {
868    use std::error::request_value;
869    use std::fmt::Write;
870
871    use risingwave_common::error::tonic::extra::Score;
872
873    let errors = errors.into_iter().collect_vec();
874
875    if errors.is_empty() {
876        return anyhow!(message.to_owned()).into();
877    }
878
879    // Create the error from the single error.
880    let single_error = |(worker_id, e)| {
881        anyhow::Error::from(e)
882            .context(format!("{message}, in worker node {worker_id}"))
883            .into()
884    };
885
886    if errors.len() == 1 {
887        return single_error(errors.into_iter().next().unwrap());
888    }
889
890    // Find the error with the highest score.
891    let max_score = errors
892        .iter()
893        .filter_map(|(_, e)| request_value::<Score>(e))
894        .max();
895
896    if let Some(max_score) = max_score {
897        let mut errors = errors;
898        let max_scored = errors
899            .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
900            .next()
901            .unwrap();
902
903        return single_error(max_scored);
904    }
905
906    // The errors do not have scores, so simply concatenate them.
907    let concat: String = errors
908        .into_iter()
909        .fold(format!("{message}: "), |mut s, (w, e)| {
910            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
911            s
912        });
913    anyhow!(concat).into()
914}